Netty:如何在服务器没有数据返回时异步等待?
问题描述:
首先,感谢一个优秀的nio框架 - Netty。Netty:如何在服务器没有数据返回时异步等待?
我们的应用程序包括客户端和服务器部分,它们使用套接字连接进行通信并发送protobuf消息。客户端和服务器都是使用OIO编写的。 我正在将当前代码移动到基于netty框架的nio上。目前我有一个服务器实现的问题:我还没有找到如何使用netty的工作线程处理处理程序。
有简化的代码:
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),
Runtime.getRuntime().availableProcessors()
)
);
// Set up the pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
final ChannelPipeline pipeline = pipeline();
// decoder
pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
pipeline.addLast("protobufDecoder", new ProtobufDecoder(LogServerProto.Request.getDefaultInstance()));
// Encoder
pipeline.addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
pipeline.addLast("protobufEncoder", new ProtobufEncoder());
// handler
pipeline.addLast("handler", new SimpleChannelUpstreamHandler() {
@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent event) throws Exception {
logger.info("handleClientSocket in thread " + Thread.currentThread().getName());
final MessageLite request = (MessageLite) event.getMessage();
final MessageLite batch = storage.readBatch(request);
if (batch != null) {
event.getChannel().write(batch);
} else {
storage.registerListener(new Storage.StorageListener() {
@Override
public void onSaveBatch() {
storage.removeListener(this);
logger.info("Send upstream in thread " + Thread.currentThread().getName());
event.getChannel().getPipeline().sendUpstream(event);
}
});
}
}
});
return pipeline;
}
});
bootstrap.bind(new InetSocketAddress("localhost", 7000));
的服务器接受来自客户机的请求(protobuf的消息)。然后服务器要求“存储”进行“批处理”。存储返回一个对象,如果没有批处理当前可用,则返回null。如果批处理为非空 - 一切正常,但如果批处理不可用,则服务器应在将批处理添加到存储时将其返回给客户端。 因此,我正在注册将批次添加到存储时调用的StorageListener(注意:批次是从单独的线程'存储 - 写入线程'添加到存储中的)。 StorageListener使用旧的'request'对象发送上游流水线,流水线处理它并且我的“处理程序”将响应(批处理)写入通道。看起来一切正常,但所有这些任务都在'存储 - 写入 - 线程'线程中处理。在这种情况下,我如何更新监听器来处理netty的工作线程中的'处理程序'?
答
如果我的答案正确无误,您的问题不会被清除,您正在寻找一种方法在可用时回复结果,这样做需要有一个线程可用,直到数据可用并且给出结果。在你的情况下,存储类可以实现一个线程。
您不应该在工作线程中等待,也不需要在该线程中执行响应。要写从服务器到客户端的任何东西,您需要有权访问客户端的通道。所以你所需要的就是把这个通道作为参数传递给你的线程,然后从那个线程发送结果。
尚不清楚,如果您有很多问题,请指定问题编号。 – 2012-01-09 15:45:33
问题是当响应数据现在不可用时,服务器如何等待和写入响应。当然,我不应该在一个处理程序中锁定一个工作线程。 (我试图用数据变得可用时调用的监听器来实现,但是这个监听器不是从工作线程调用的,也许我应该以某种方式修改监听器,或者使用其他方法,而不是任何监听器) – Konstantin 2012-01-10 13:26:44
可以从任何线程写入通道。是否可以将频道传递给StoreListener实现,然后使用Channels.write(....)写入频道? – johnstlr 2012-01-10 14:28:16