Netty:如何在服务器没有数据返回时异步等待?

Netty:如何在服务器没有数据返回时异步等待?

问题描述:

首先,感谢一个优秀的nio框架 - Netty。Netty:如何在服务器没有数据返回时异步等待?

我们的应用程序包括客户端和服务器部分,它们使用套接字连接进行通信并发送protobuf消息。客户端和服务器都是使用OIO编写的。 我正在将当前代码移动到基于netty框架的nio上。目前我有一个服务器实现的问题:我还没有找到如何使用netty的工作线程处理处理程序。

有简化的代码:

ServerBootstrap bootstrap = new ServerBootstrap(
    new NioServerSocketChannelFactory(
    Executors.newCachedThreadPool(), 
    Executors.newCachedThreadPool(), 
    Runtime.getRuntime().availableProcessors() 
) 
); 

// Set up the pipeline factory. 
bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 
    public ChannelPipeline getPipeline() throws Exception { 
    final ChannelPipeline pipeline = pipeline(); 
    // decoder 
    pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder()); 
    pipeline.addLast("protobufDecoder", new ProtobufDecoder(LogServerProto.Request.getDefaultInstance())); 
    // Encoder 
    pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender()); 
    pipeline.addLast("protobufEncoder", new ProtobufEncoder()); 
    // handler 
    pipeline.addLast("handler", new SimpleChannelUpstreamHandler() { 
     @Override 
     public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent event) throws Exception { 
     logger.info("handleClientSocket in thread " + Thread.currentThread().getName()); 
     final MessageLite request = (MessageLite) event.getMessage(); 
     final MessageLite batch = storage.readBatch(request); 
     if (batch != null) { 
      event.getChannel().write(batch); 
     } else { 
      storage.registerListener(new Storage.StorageListener() { 
      @Override 
      public void onSaveBatch() { 
       storage.removeListener(this); 
       logger.info("Send upstream in thread " + Thread.currentThread().getName()); 
       event.getChannel().getPipeline().sendUpstream(event); 
      } 
      }); 
     } 
     } 
    }); 
    return pipeline; 
    } 
}); 

bootstrap.bind(new InetSocketAddress("localhost", 7000)); 

的服务器接受来自客户机的请求(protobuf的消息)。然后服务器要求“存储”进行“批处理”。存储返回一个对象,如果没有批处理当前可用,则返回null。如果批处理为非空 - 一切正常,但如果批处理不可用,则服务器应在将批处理添加到存储时将其返回给客户端。 因此,我正在注册将批次添加到存储时调用的StorageListener(注意:批次是从单独的线程'存储 - 写入线程'添加到存储中的)。 StorageListener使用旧的'request'对象发送上游流水线,流水线处理它并且我的“处理程序”将响应(批处理)写入通道。看起来一切正常,但所有这些任务都在'存储 - 写入 - 线程'线程中处理。在这种情况下,我如何更新监听器来处理netty的工作线程中的'处理程序'?

+0

尚不清楚,如果您有很多问题,请指定问题编号。 – 2012-01-09 15:45:33

+0

问题是当响应数据现在不可用时,服务器如何等待和写入响应。当然,我不应该在一个处理程序中锁定一个工作线程。 (我试图用数据变得可用时调用的监听器来实现,但是这个监听器不是从工作线程调用的,也许我应该以某种方式修改监听器,或者使用其他方法,而不是任何监听器) – Konstantin 2012-01-10 13:26:44

+1

可以从任何线程写入通道。是否可以将频道传递给StoreListener实现,然后使用Channels.write(....)写入频道? – johnstlr 2012-01-10 14:28:16

如果我的答案正确无误,您的问题不会被清除,您正在寻找一种方法在可用时回复结果,这样做需要有一个线程可用,直到数据可用并且给出结果。在你的情况下,存储类可以实现一个线程。

您不应该在工作线程中等待,也不需要在该线程中执行响应。要写从服务器到客户端的任何东西,您需要有权访问客户端的通道。所以你所需要的就是把这个通道作为参数传递给你的线程,然后从那个线程发送结果。