如何在服务器上的缓冲区中存在持续的消息流时检索字符串?
我使用Netty实现了一个基本的客户端和服务器程序。客户端向服务器发送一组消息字符串,服务器接受并显示给终端。我可以为任何数量的字符串执行此操作。但是有时候这些字符串会被剪切成一半或任意大小,并显示在服务器上。有什么办法可以消除这种情况吗?我们正在从客户端发送QuickFix字符串。如何在服务器上的缓冲区中存在持续的消息流时检索字符串?
下面是我的样品的QuickFix字符串:
8 = FIX.4.29 = 0007935 = A49 = TTDS68AO56 = RaviEx34 = 152 = 20170427-14:05:04.572108 = 60141 = Y98 = 010 = 242
上述字符串中的字符“r”是SOH字符。现在我的要求是:当我从ClientHandler向服务器循环发送多个字符串时,一些字符串会自动剪切并显示在服务器端(可能由于Netty的速度)。我想消除这一点。每个字符串从“8 =”开始,以“10 = xxx”结束。任何人都可以帮助我从连续缓冲区中检索字符串,因为所有的字符串都被插入缓冲区。
现在,当我运行我的下面的代码时,有时候我会得到我的字符串,有时会显示异常。异常是由于半字符串。在FIXMESSAGEDECODER类中,我编写了用于检索以“8 =”开始并以“10 =”结尾的字符串的逻辑。
任何人都可以帮助我如何从缓冲区中准确地检索消息字符串,而不会呕吐消息的任何部分。
我的客户代码:
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception{
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = b.connect().sync();
future.channel().closeFuture().sync();
}
finally {
group.shutdownGracefully().sync();
}
}
public static void main (String [] args) throws Exception {
new EchoClient("127.0.0.1", 11235).start();
}
}
我ClientHandler的:
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{
@Override
public void channelActive(ChannelHandlerContext ctx){
System.out.println("Connected");
int i=0;
while(i<100){
ctx.writeAndFlush(Unpooled.copiedBuffer("8=FIX.4.29=0007935=A49=TTDS68AO56=RaviEx34=152=20170427-14:05:04.572108=60141=Y98=010=242\n",
CharsetUtil.UTF_8));
i++;
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
我的服务器:
public class EchoServer{
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
System.out.println("New client connected: " + ch.localAddress());
ch.pipeline().addLast(new FixMessageDecoder());
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
}
finally {
group.shutdownGracefully().sync();
}
}
public static void main (String [] args) throws Exception {
new EchoServer(11235).start();
}
}
我FixMessage解码器:
public class FixMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception{
String messg = in.toString(CharsetUtil.UTF_8);
String str = messg.substring(messg.indexOf("8"), messg.lastIndexOf("10")+6);
System.out.println(str);
}
}
不幸的是,不能保证你的字符串马上就来为一体。可能是当解码器第一次调用时,缓冲区将包含一个字符串,例如[8 = ...],第二次调用时会有[... 10 = XXX]。另一点是你可以得到你的字符串和下一个字符串的一部分,如[8 = ... 10 = XXX8 = ...]。而且你必须考虑比数字8和10更好的线条探测器。如果你确定在8=
和10=XXX
这样的线条模式下只能使用一次,那就使用它。
我可以建议你改写你的解码器测试驱动开发样式。幸运的是,解码器非常容易测试。首先你写了很多测试,你可以想象你在那里描述传入缓冲区的很多可能的变体(完整的字符串,字符串的一部分,两个字符串)。然后你写你的解码器来通过所有这些测试。
你的解码器是完全错误的。不必将整个缓冲区转换为字符串,然后将其大部分抛弃,必须确保仅消耗缓冲区中作为正在解码的消息的一部分的字节,并将下一个缓冲区中的字节留给下一个时间。我怀疑你是在这里以正确的方式识别信息的结尾。请参阅FIX协议规范.. – EJP