Netty(4):核心部件: ChannelPipeline & ChannelHandler 处理链
ChannelHandler
Channel 生命周期
Netty 的 Channel 含有以下 4 个生命周期:
-
channelUnregistered - channel已创建但未注册到一个 EventLoop.
-
channelRegistered - channel 注册到一个 EventLoop.
-
channelActive - channel 变为活跃状态(连接到了远程主机),现在可以接收和发送数据了
-
channelInactive - channel 处于非活跃状态,没有连接到远程主机
ChannelHandler 生命周期
Netty 的 ChannelHandler 拥有以下生命周期,分别对应 ChannelHandler 添加到 ChannelPipeline,从 ChannelPipeline 中删除的动作,ChannelHandler 提供了以下的成员方法对应这些生命周期:
handlerAdded(ChannelHandlerContext) | 当 ChannelHandler 添加到 ChannelPipeline 调用 |
handlerRemoved(ChannelHandlerContext) | 当 ChannelHandler 从 ChannelPipeline 移除时调用 |
exceptionCaught(ChannelHandlerContext,Throwable) | 当 ChannelPipeline 执行抛出异常时调用 |
ChannelHandler 子接口
Netty 提供2个重要的 ChannelHandler 子接口:
- ChannelInboundHandler - 处理进站数据和所有状态更改事件
-
ChannelOutboundHandler - 处理出站数据,允许拦截各种操作
同时对于这些子接口,为了方便实现类,Netty 还分别提供了相应的适配器类 ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter、ChannelHandlerAdapter 分别用于覆盖实现 ChannelHandler;
ChannelInboundHandler
ChannelInboundHandler 的生命周期方法如下,当接收到数据或者与之关联的 Channel 状态改变时调用:
channelRegistered(ctx:ChannelHandlerContext) | 当 Channel 注册了 EventLoop ,并可以开始处理 I/O 时调用; |
channelUnregistered(ctx:ChannelHandlerContext) | 当 Channel 取消注册 EventLoop 时调用; |
channelActive(ctx:ChannelHandlerContext) | 当 Channel 处于活跃状态,连接到远程主机时调用; |
channelInactive(ctx:ChannelHandlerContext) | 当 Channel 处于非活跃状台,从远程主机断开连接或者没有来连接时调用; |
channelReadComplete(ctx:ChannelHandlerContext) | 当 Channel 上读操作完成时调用; |
channelRead(ctx:ChannelHandlerContext) | 当数据从 Channel 上读取时调用; |
channelWritabilityChanged(ctx:ChannelHandlerContext) | 当 Channel 的可写状态发生变化时调用,用户可以控制通道写入速度避免内存溢出,当通道重新可写时会触发该事件,可以在该方法重新进行写操作,可以通过 isWritable() 来检查 channel 是否可写; |
userEventTriggered(ctx:ChannelHandlerContext, evt:Object) | 当用户调用 Channel.fireUserEventTriggered() 通过 ChannelPipeline 传递 POJO 时调用;可以处理传输到 ChannelPipeline 的特定事件; |
需要注意的是,ChannelInboundHandler 实现覆盖了 channelRead() 方法处理读取的数据,在使用完资源后,要注意响应释放资源。Netty 在 ByteBuf 上使用了资源池,所以当执行释放资源时可以减少内存的消耗。
Sharable .
public class HelloHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
...
ReferenceCountUtil.release(msg); //通过引用计数器手动释放资源
}
}
如果这样手动释放资源的方式过于繁琐,可以考虑使用 SimpleChannelInboundHandler,该子类已经实现了资源的自动释放;
Sharable .
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> {
public void channelRead0(ChannelHandlerContext ctx, Object msg) {
//不需要手动释放资源 msg, SimpleChannelInboundHandler 会自动释放资源
}
}
ChannelOutboundHandler
ChannelOutboundHandler 提供了出站操作时调用的方法,这些方法会被 Channel, ChannelPipeline, 和 ChannelHandlerContext 调用;
ChannelOutboundHandler 另个一个强大的方面是它具有在请求时延迟操作或者事件的能力。比如,在写数据到 remote peer 的过程中被意外暂停,可以延迟执行刷新操作,然后在迟些时候继续;
bind(ChannelHandlerContext,SocketAddress, ChannelPromise) | 当 Channel 绑定到本地地址时调用; |
connect(ChannelHandlerContext,local:SocketAddress,remote:SocketAddress, ChannelPromise) | 当 Channel 连接到远程地址时调用; |
read(ChannelHandlerContex) | 当数据从 Channel 中读取时调用; |
write(ChannelHandlerContex,msg:Object,ChannelPromise) | 当数据写入到 Channel 中时调用; |
flush(ChannelHandlerContex) | 当通过 Channel 冲刷缓存数据到远程用户时调用; |
disconnect(ChannelHandlerContex,ChannelPromise) | 当 Channel 从远程地址断开连接时调用; |
deregister(ChannelHandlerContex,ChannelPromise) | 当 Channel 取消注册 EventLoop 时调用; |
close(ChannelHandlerContex,ChannelPromise) | 当 Channel 关闭时调用; |
ChannelPromise 是特殊的 ChannelFuture,允许 ChannelPromise 及其操作成功或失败。所以任何时候调用例如 Channel.write(...) 一个新的 ChannelPromise将会创建并且通过 ChannelPipeline传递。这次写操作本身将会返回 ChannelFuture, 这样只允许你得到一次操作完成的通知。Netty 本身使用 ChannelPromise 作为返回的 ChannelFuture 的通知,事实上在大多数时候就是 ChannelPromise 自身(ChannelPromise 扩展了 ChannelFuture)
同样的,类似 ChannelInBoundHandler ,在使用 write 向通道写入数据,在消费完该数据后,需要注意释放该资源对象的引用计数器,同样可以直接使用 SimpleChannelOutBoundHandler ,可以自动释放资源的引用计数;
ChannelPipeline
ChannelPipeline 是 ChannelHandler 实例的容器,流经一个 Channel 的入站和出站事件可以被 ChannelPipeline 拦截,ChannelPipeline能够让用户自己对入站/出站事件的处理逻辑,以及pipeline里的各个Handler之间的交互进行定义。
每当一个新的 Channel 被创建了,都会建立一个新的 ChannelPipeline,并且这个新的 ChannelPipeline 还会绑定到 Channel 上,这个关联是永久性的;
根据它的起源,一个事件将由 ChannelInboundHandler 或 ChannelOutboundHandler 处理。随后它将调用 ChannelHandlerContext 实现转发到下一个相同的超类型的处理程序。
ChannelPipeline 操纵 ChannelHandler
可以实时修改 ChannelPipeline 的布局,通过添加、移除、替换其中的 ChannelHandler;
addFirst([name:String,] ChannelHandler) | 向 ChannelPipeline 头部添加 ChannelHandler; |
addLast([name:String,] ChannelHandler) | 向 ChannelPipeline 尾部添加 ChannelHandler; |
addBefore (baseName:String,name:String, ChannelHandler) | 向指定的 ChannelHandler 之前添加 ChannelHandler; |
addAfter (baseName:String,name:String,ChannelHandler) | 向指定的 ChannelHandler 之后添加 ChannelHandler; |
remove(name:String / ChannelHandler) | 删除指定 ChannelHandler; |
replace(old:String,new:String,ChannelHandler) | 替换指定 ChannelHandler; |
以下的 API 用于查看 ChannelPipeline 中的 ChannelHandler :
get(name:String) | 获取指定的 ChannelHandler; |
List<String> names() | 获取 ChannelPipeline 的所有 ChannelHandler 列表 |
iterator() | 获取 ChannelPipeline 的所有 ChannelHandler 的迭代器 |
context(name:String / ChannelHandler) | 获取指定 ChannelHandler 的 ChannelHandlerContext |
示例:
ChannelPipeline pipeline = null;
FirstHandler firstHandler = new FirstHandler();
pipeline.addLast("handler1", firstHandler);
pipeline.addFirst("handler2", new SecondHandler());
pipeline.addAfter("handler2","handler3", new ThirdHandler());
pipeline.remove("handler3");
pipeline.remove(firstHandler);
pipeline.replace("handler2", "handler4", new ForthHandler());
ChannelPipeline 向 Channel 发送事件
ChannelPipeline API 有额外调用入站和出站操作的方法;
入站操作 API,用于向 ChannelPipeline 中 ChannelInboundHandlers 通知正在发生的事件:
fireChannelRegistered() | 向下一个 ChannelInboundHandler 通知 ChannelRegistered 事件; |
fireChannelUnregistered() | 向下一个 ChannelInboundHandler 通知 ChannelUnregistered 事件; |
fireChannelActive() | 向下一个 ChannelInboundHandler 通知 ChannelActive 事件; |
fireChannelInactive() | 向下一个 ChannelInboundHandler 通知 ChannelInactive 事件; |
fireExceptionCaught() | 向下一个 ChannelInboundHandler 通知 ExceptionCaught事件; |
fireUserEventTriggered() | 向下一个 ChannelInboundHandler 通知 UserEventTriggered事件; |
fireChannelRead(msg:Object) | 向下一个 ChannelInboundHandler 通知 ChannelRead 事件; |
fireChannelReadComplete() | 向下一个 ChannelInboundHandler 通知 ChannelReadComplete 事件; |
出站操作 API,用于向 ChannelPipeline 中 ChannelOutboundHandlers 通知正在发生的事件:
bind(local:SocketAddress , ChannelPromise ) | 向下一个 ChannelOutboundHandler 发送 bind 事件 |
connect(local:SocketAddress,remoteLSocketAddress[,ChannelPromise]) | 向下一个 ChannelOutboundHandler 发送 connect 事件 |
disconnect([ChannelPromise]) | 向下一个 ChannelOutboundHandler 发送 disconnect 事件 |
close() | 向下一个 ChannelOutboundHandler 发送 close 事件 |
deregister() | 向下一个 ChannelOutboundHandler 发送 disregister 事件 |
flush() | 向下一个 ChannelOutboundHandler 发送 flush 事件 |
write(msg:Object [,ChannelPromise]) | 向下一个 ChannelOutboundHandler 发送 write 事件 |
writeAndFlush(msg:Object [,ChannelPromise]) | 向下一个 ChannelOutboundHandler 发送 write 和 flush 事件 |
read() | 向下一个 ChannelOutboundHandler 发送 read 事件 |
ChannelHandlerContext
在ChannelHandler 添加到 ChannelPipeline 时会创建一个实例,即 ChannelHandlerContext,它代表了 ChannelHandler 和ChannelPipeline 之间的关联。接口ChannelHandlerContext 主要是对通过同一个 ChannelPipeline 关联的 ChannelHandler 之间的交互进行管理;
① Channel 绑定到 ChannelPipeline;
② ChannelPipeline 绑定到 包含 ChannelHandler 的 Channel;
③ ChannelHandler;
④ 当添加 ChannelHandler 到 ChannelPipeline 时,ChannelHandlerContext 被创建;
ChannelHandlerContext 中包含了有许多方法,其中一些方法也出现在 Channel 和ChannelPipeline 本身(如 bind、connect、register、fireChannelRead 等)。如果您通过Channel 或ChannelPipeline 的实例来调用这些方法,他们就会在整个 pipeline中传播 。相比之下,一样的方法在 ChannelHandlerContext 的实例上调用, 就只会从当前的 ChannelHandler 开始并传播到相关管道中的下一个有处理事件能力的 ChannelHandler ;
ChannelHandlerContext 与 ChannelHandler 的关联从不改变,所以缓存它的引用是安全的。
ChannelHandlerContext 所包含的事件流比其他类中同样的方法都要短,利用这一点可以尽可能高地提高性能。
从 ChannelHandlerContext 获取 Channel、ChannelPipeline 并传播事件
从 ChannelHandlerContext 获取 Channel,并写入数据:
ChannelHandlerContext ctx = context;
Channel channel = ctx.channel();
channel.write(Unpooled.copiesBuffer("Hello World!", CharsetUtil.UTF_8));
从 ChannelHandlerContext 获取 ChannelPipeline,并写入数据:
ChannelHandlerContext ctx = context;
ChannelPipeline pipeline = ctx.pipeline();
pipeline.write(Unpooled.copiedBuffer("Hello World!", CharsetUtil.UTF_8));
以上两个例子中,虽然在 Channel 、ChannelPipeline 上调用write() 都会把事件在整个管道传播,但是在 ChannelHandler 级别上,从一个处理程序转到下一个却要通过在 ChannelHandlerContext 调用方法实现,如下:
① 事件传递给 ChannelPipeline 的第一个 ChannelHandler
② ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的 下一个
③ ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的 下一个
指定某个事件由特定的 ChannelHandler 处理
如果实现从一个特定的 ChannelHandler 开始处理事件,必须引用当前 ChannelHandler 的前一个 ChannelHandler 关联的 ChannelHandlerContext ,这样才能使得事件从当前的 ChannelHanlder 开始处理,如下:
如果要由 ChannelHandler3 来处理写入事件,需要获取 ChannelHandler2 的ChannelHandlerContext 写入消息:
以下是一种实现方式,在 ChannelHandler2 中储存自身的 ChannelHandlerContext,并提供一个 ChannelHandlerContext 的外部调用方法(在本例中为发送信息);
Sharable .
public class ChannelHandler2 extends ChannelHandlerAdapter {
private ChannelHandlerContext ctx;
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx; //储存ChannelHandler 本身的 ChannelHandlerContext
}
public void send(String msg) {
ctx.writeAndFlush(msg); //提供自身 ChannelHandlerContext 的外部调用方法
}
}
之后在任意地方只要获取/创建 ChannelHandler2 的实例,并调用其 send 方法,由该方法写入的 msg 会向后面的 ChannelHandler 传播,并会被 ChannelHandler3 的 channelRead() 方法所捕获;
在 ChannelHandlerContext 之间传递 channelRead 的 msg 消息对象
有一种常见的应用场景,在某个 ChannelHandler 获取到写入事件的消息后,需要由后面的 ChannelHandler 对该消息进行进一步的处理,典型的应用为日志处理器;
如下示例中,Handler1 处理完写入消息后,需要将该消息传递给 Handler2 进行日志处理,(假设处理器顺序为 Handler1 -> Handler2)可以如下实现:
Sharable .
public class Handler1 extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("Handle data " + in.toString(CharsetUtil.UTF_8)); //处理写入数据
ctx.fireChannelRead(msg); //传递一个 channelRead 事件给接下来的 ChannelHandler
}
}
Sharable .
public class Handler2 extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("log data " + in.toString(CharsetUtil.UTF_8)); //将获取的数据写入日志
}
}
※以上例子中使用 @Sharable 注解表明该 ChannelHandler 的实例在 Channel 之间是可以共享的,即多个 ChannelPipeline 可以并发访问该 ChannelHandler 实例,如果该 ChannelHandler 中含有持有状态的代码,不建议使用该注解,否则需要做好代码的状态线程安全;