在JAVA中提高Netty服务器的性能

在JAVA中提高Netty服务器的性能

问题描述:

我有这样的情况:我的Netty服务器将以惊人的速度从客户端获取数据。我认为客户正在使用某种速度的PUSH机制。我不知道PUSH-POP机制究竟是什么,但我确实认为客户端正在使用某种机制以非常高的速度发送数据。现在我的要求是,我编写了一个简单的TCP Netty服务器,它接收来自客户端,并添加到使用ArrayBlockingQueue实现的BlockingQueue中。现在,由于Netty是基于事件的,因此接收数据并将其存储在队列中所需的时间就更多了,这就引发了客户端的一个异常情况,即Netty服务器没有运行。但是我的服务器运行得很完美,但接受单个数据并将其存储在队列中的时间更多。我怎样才能防止这一点?这种情况有没有最快排队?我使用BlockingQueue,因为另一个线程会从队列中取数据并处理它。所以我需要一个同步队列。我该如何提高服务器的性能,或者有什么方法以非常高的速度插入数据?我所关心的只是延迟。延迟需要尽可能低。在JAVA中提高Netty服务器的性能

我的服务器代码:

public class Server implements Runnable { 
private final int port; 

static String message; 
Channel channel; 
ChannelFuture channelFuture; 
int rcvBuf, sndBuf, lowWaterMark, highWaterMark; 

public Server(int port) { 
    this.port = port; 
    rcvBuf = 2048; 
    sndBuf = 2048; 
    lowWaterMark = 1024; 
    highWaterMark = 2048; 
} 

@Override 
public void run() { 
    try { 
     startServer(); 

    } catch (Exception ex) { 
     System.err.println("Error in Server : "+ex); 
     Logger.error(ex.getMessage()); 
    } 
} 

public void startServer() { 
    // System.out.println("8888 Server started"); 
    EventLoopGroup group = new NioEventLoopGroup(); 
    try { 
     ServerBootstrap b = new ServerBootstrap(); 
     b.group(group) 
     .channel(NioServerSocketChannel.class) 
     .localAddress(new InetSocketAddress(port)) 
     .childOption(ChannelOption.SO_RCVBUF, rcvBuf * 2048) 
     .childOption(ChannelOption.SO_SNDBUF, sndBuf * 2048) 
     .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(lowWaterMark * 2048, highWaterMark * 2048)) 
     .childOption(ChannelOption.TCP_NODELAY, true) 

      .childHandler(new ChannelInitializer<SocketChannel>() { 

        @Override 
        public void initChannel(SocketChannel ch) throws Exception { 
         channel = ch; 
         System.err.println("OMS connected : " + ch.localAddress()); 
         ch.pipeline().addLast(new ReceiveFromOMSDecoder()); 
        } 
       }); 
     channelFuture = b.bind(port).sync(); 
     this.channel = channelFuture.channel(); 
     channelFuture.channel().closeFuture().sync(); 

    } catch (InterruptedException ex) { 
     System.err.println("Exception raised in SendToOMS class"+ex); 
    } finally { 
     group.shutdownGracefully(); 
    } 
} 

}

我ServerHandler代码:

@Sharable 
public class ReceiveFromOMSDecoder extends MessageToMessageDecoder<ByteBuf> { 


    private Charset charset; 
    public ReceiveFromOMSDecoder() { 
     this(Charset.defaultCharset()); 
    } 

    /** 
    * Creates a new instance with the specified character set. 
    */ 
    public ReceiveFromOMSDecoder(Charset charset) { 
     if (charset == null) { 
      throw new NullPointerException("charset"); 
     } 
     this.charset = charset; 
    } 

    @Override 
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { 

     String buffer = msg.toString(charset); 
     if(buffer!=null){ 
     Server.sq.insertStringIntoSendingQueue(buffer); //inserting into queue 
     } 
     else{ 
      Logger.error("Null string received"+buffer); 
     } 
    } 
     @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
     // Logger.error(cause.getMessage()); 
     System.err.println(cause); 
    } 
} 
+0

我投票结束这个问题为“太宽泛”,因为您似乎有多个问题。我从上下文中挑出的一个问题是,在客户端或服务器端抛出异常,另一个问题是性能问题。性能问题本身也很大,并且有很多原因。如果您希望接收大量数据,那么如果将所有协议数据保留为字符串,并且将每个数据包直接传递给您的服务器类,在查看线程争用时也会给出其自己的开销 – Ferrybig

三个快速问答:

  1. 看起来不像你发送一个响应。你可能应该。
  2. 不要阻塞IO线程。使用EventExecutorGroup来分派传入有效负载的处理。即类似ChannelPipeline.addLast(EventExecutorGroup group, String name, ChannelHandler handler)
  3. 只是不阻止一般。抛开你的ArrayBlockingQueue并看看JCTools或其他一些实现来寻找一个非阻塞模拟。
+0

谢谢您很快就会遇到问题尼古拉斯。目前我正在使用ConcurrentLinkedQueue而不是ArrayBlockingQueue。我在JCTools上找不到任何示例。我在使用它时非常困惑。我已经浏览了图书馆,并且知道SPSC队列是我设计的队列。但我不知道如何使用它。你能否给我提供一些使用图书馆的例子或者任何文件请致电 –

+0

hema;我会为JCTools查询开一个单独的问题。 – Nicholas