如何在服务器上的缓冲区中存在持续的消息流时检索字符串?

如何在服务器上的缓冲区中存在持续的消息流时检索字符串?

问题描述:

我使用Netty实现了一个基本的客户端和服务器程序。客户端向服务器发送一组消息字符串,服务器接受并显示给终端。我可以为任何数量的字符串执行此操作。但是有时候这些字符串会被剪切成一半或任意大小,并显示在服务器上。有什么办法可以消除这种情况吗?我们正在从客户端发送QuickFix字符串。如何在服务器上的缓冲区中存在持续的消息流时检索字符串?

下面是我的样品的QuickFix字符串:

8 = FIX.4.29 = 0007935 = A49 = TTDS68AO56 = RaviEx34 = 152 = 20170427-14:05:04.572108 = 60141 = Y98 = 010 = 242

上述字符串中的字符“r”是SOH字符。现在我的要求是:当我从ClientHandler向服务器循环发送多个字符串时,一些字符串会自动剪切并显示在服务器端(可能由于Netty的速度)。我想消除这一点。每个字符串从“8 =”开始,以“10 = xxx”结束。任何人都可以帮助我从连续缓冲区中检索字符串,因为所有的字符串都被插入缓冲区。

现在,当我运行我的下面的代码时,有时候我会得到我的字符串,有时会显示异常。异常是由于半字符串。在FIXMESSAGEDECODER类中,我编写了用于检索以“8 =”开始并以“10 =”结尾的字符串的逻辑。

任何人都可以帮助我如何从缓冲区中准确地检索消息字符串,而不会呕吐消息的任何部分。

我的客户代码:

public class EchoClient { 

     private final String host; 
     private final int port; 

     public EchoClient(String host, int port) { 
      this.host = host; 
      this.port = port; 
     } 
    public void start() throws Exception{ 
    EventLoopGroup group = new NioEventLoopGroup(); 
    try{ 
    Bootstrap b = new Bootstrap(); 
    b.group(group).channel(NioSocketChannel.class) 
      .remoteAddress(new InetSocketAddress(host, port)) 
      .handler(new ChannelInitializer<SocketChannel>(){ 

      @Override 
      public void initChannel(SocketChannel ch) throws Exception{ 
      ch.pipeline().addLast(new EchoClientHandler()); 
      } 
      }); 

       ChannelFuture future = b.connect().sync(); 
       future.channel().closeFuture().sync(); 
     } 

      finally { 
       group.shutdownGracefully().sync(); 
      } 
     } 

     public static void main (String [] args) throws Exception { 
      new EchoClient("127.0.0.1", 11235).start(); 
     } 
    } 

我ClientHandler的:

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{ 

@Override 
public void channelActive(ChannelHandlerContext ctx){ 
    System.out.println("Connected"); 
    int i=0; 
    while(i<100){ 
ctx.writeAndFlush(Unpooled.copiedBuffer("8=FIX.4.29=0007935=A49=TTDS68AO56=RaviEx34=152=20170427-14:05:04.572108=60141=Y98=010=242\n", 
CharsetUtil.UTF_8)); 

    i++; 
    } 
} 

@Override 
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception { 
    System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8)); 
} 

@Override 
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ 
    cause.printStackTrace(); 
    ctx.close(); 
    } 
} 

我的服务器:

public class EchoServer{ 

private final int port; 
public EchoServer(int port) { 
     this.port = port; 
} 

public void start() throws Exception { 
    EventLoopGroup group = new NioEventLoopGroup(); 
    try { 
      ServerBootstrap b = new ServerBootstrap(); 
      b.group(group) 
      .channel(NioServerSocketChannel.class) 
      .localAddress(new InetSocketAddress(port)) 
      .childHandler(new ChannelInitializer<SocketChannel>() { 
      @Override 
       public void initChannel(SocketChannel ch) throws Exception { 
       System.out.println("New client connected: " + ch.localAddress()); 
       ch.pipeline().addLast(new FixMessageDecoder()); 
       } 
      }); 

     ChannelFuture f = b.bind().sync(); 
     f.channel().closeFuture().sync(); 
    } 
    finally { 
       group.shutdownGracefully().sync(); 
      } 
     } 

     public static void main (String [] args) throws Exception { 
     new EchoServer(11235).start(); 
     } 
} 

我FixMessage解码器:

public class FixMessageDecoder extends MessageToMessageDecoder<ByteBuf> { 

@Override 
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception{ 
    String messg = in.toString(CharsetUtil.UTF_8); 
    String str = messg.substring(messg.indexOf("8"), messg.lastIndexOf("10")+6); 
    System.out.println(str); 
} 
} 
+0

你的解码器是完全错误的。不必将整个缓冲区转换为字符串,然后将其大部分抛弃,必须确保仅消耗缓冲区中作为正在解码的消息的一部分的字节,并将下一个缓冲区中的字节留给下一个时间。我怀疑你是在这里以正确的方式识别信息的结尾。请参阅FIX协议规范.. – EJP

不幸的是,不能保证你的字符串马上就来为一体。可能是当解码器第一次调用时,缓冲区将包含一个字符串,例如[8 = ...],第二次调用时会有[... 10 = XXX]。另一点是你可以得到你的字符串和下一个字符串的一部分,如[8 = ... 10 = XXX8 = ...]。而且你必须考虑比数字8和10更好的线条探测器。如果你确定在8=10=XXX这样的线条模式下只能使用一次,那就使用它。

我可以建议你改写你的解码器测试驱动开发样式。幸运的是,解码器非常容易测试。首先你写了很多测试,你可以想象你在那里描述传入缓冲区的很多可能的变体(完整的字符串,字符串的一部分,两个字符串)。然后你写你的解码器来通过所有这些测试。