Netty(4):核心部件: ChannelPipeline & ChannelHandler 处理链

ChannelHandler

Channel 生命周期

Netty 的 Channel 含有以下 4 个生命周期:
  • channelUnregistered   - channel已创建但未注册到一个 EventLoop.
  • channelRegistered      - channel 注册到一个 EventLoop.
  • channelActive             - channel 变为活跃状态(连接到了远程主机),现在可以接收和发送数据了
  • channelInactive           - channel 处于非活跃状态,没有连接到远程主机
Netty(4):核心部件: ChannelPipeline & ChannelHandler 处理链

ChannelHandler 生命周期

Netty 的 ChannelHandler 拥有以下生命周期,分别对应 ChannelHandler 添加到 ChannelPipeline,从 ChannelPipeline 中删除的动作,ChannelHandler 提供了以下的成员方法对应这些生命周期:
handlerAdded(ChannelHandlerContext) 当 ChannelHandler 添加到 ChannelPipeline 调用
handlerRemoved(ChannelHandlerContext) 当 ChannelHandler 从 ChannelPipeline 移除时调用
exceptionCaught(ChannelHandlerContext,Throwable) 当 ChannelPipeline 执行抛出异常时调用

ChannelHandler 子接口

Netty 提供2个重要的 ChannelHandler 子接口:
  • ChannelInboundHandler - 处理进站数据和所有状态更改事件
  • ChannelOutboundHandler - 处理出站数据,允许拦截各种操作

同时对于这些子接口,为了方便实现类,Netty 还分别提供了相应的适配器类 ChannelInboundHandlerAdapter、ChannelOutboundHandlerAdapter、ChannelHandlerAdapter 分别用于覆盖实现 ChannelHandler;

ChannelInboundHandler

ChannelInboundHandler 的生命周期方法如下,当接收到数据或者与之关联的 Channel 状态改变时调用:
channelRegistered(ctx:ChannelHandlerContext) 当 Channel 注册了 EventLoop ,并可以开始处理 I/O 时调用;
channelUnregistered(ctx:ChannelHandlerContext) 当 Channel 取消注册 EventLoop 时调用;
channelActive(ctx:ChannelHandlerContext) 当 Channel 处于活跃状态,连接到远程主机时调用;
channelInactive(ctx:ChannelHandlerContext) 当 Channel 处于非活跃状台,从远程主机断开连接或者没有来连接时调用;
channelReadComplete(ctx:ChannelHandlerContext) 当 Channel 上读操作完成时调用;
channelRead(ctx:ChannelHandlerContext) 当数据从 Channel 上读取时调用;
channelWritabilityChanged(ctx:ChannelHandlerContext) 当 Channel 的可写状态发生变化时调用,用户可以控制通道写入速度避免内存溢出,当通道重新可写时会触发该事件,可以在该方法重新进行写操作,可以通过 isWritable() 来检查 channel 是否可写;
userEventTriggered(ctx:ChannelHandlerContext, evt:Object) 当用户调用 Channel.fireUserEventTriggered() 通过 ChannelPipeline 传递 POJO 时调用;可以处理传输到 ChannelPipeline 的特定事件;
一般没有特殊需求,只需要实现 channelRead() 方法的处理逻辑即可;

需要注意的是,ChannelInboundHandler 实现覆盖了 channelRead() 方法处理读取的数据,在使用完资源后,要注意响应释放资源。Netty 在 ByteBuf 上使用了资源池,所以当执行释放资源时可以减少内存的消耗。
 
@ChannelHandler.Sharable
public class HelloHandler extends ChannelInboundHandlerAdapter {        
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ...
        ReferenceCountUtil.release(msg); //通过引用计数器手动释放资源
    }
}
如果这样手动释放资源的方式过于繁琐,可以考虑使用 SimpleChannelInboundHandler,该子类已经实现了资源的自动释放;
 
@ChannelHandler.Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { 
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object msg) {
        //不需要手动释放资源 msg, SimpleChannelInboundHandler 会自动释放资源
    }
}

ChannelOutboundHandler

ChannelOutboundHandler 提供了出站操作时调用的方法,这些方法会被 Channel, ChannelPipeline, 和 ChannelHandlerContext 调用;
ChannelOutboundHandler 另个一个强大的方面是它具有在请求时延迟操作或者事件的能力。比如,在写数据到 remote peer 的过程中被意外暂停,可以延迟执行刷新操作,然后在迟些时候继续;
bind(ChannelHandlerContext,SocketAddress, ChannelPromise) 当 Channel 绑定到本地地址时调用;
connect(ChannelHandlerContext,local:SocketAddress,remote:SocketAddress, ChannelPromise) 当 Channel 连接到远程地址时调用;
read(ChannelHandlerContex) 当数据从 Channel 中读取时调用;
write(ChannelHandlerContex,msg:Object,ChannelPromise) 当数据写入到 Channel 中时调用;
flush(ChannelHandlerContex) 当通过 Channel 冲刷缓存数据到远程用户时调用;
disconnect(ChannelHandlerContex,ChannelPromise) 当 Channel 从远程地址断开连接时调用;
deregister(ChannelHandlerContex,ChannelPromise) 当 Channel 取消注册 EventLoop 时调用;
close(ChannelHandlerContex,ChannelPromise) 当 Channel 关闭时调用;

几乎所有的方法都将 ChannelPromise 作为参数,一旦在该阶段的请求结束要通过 ChannelPipeline 转发的时候,必须通知此参数。

ChannelPromise 是特殊的 ChannelFuture,允许 ChannelPromise 及其操作成功或失败。所以任何时候调用例如 Channel.write(...) 一个新的 ChannelPromise将会创建并且通过 ChannelPipeline传递。这次写操作本身将会返回 ChannelFuture, 这样只允许你得到一次操作完成的通知。Netty 本身使用 ChannelPromise 作为返回的 ChannelFuture 的通知,事实上在大多数时候就是 ChannelPromise 自身(ChannelPromise 扩展了 ChannelFuture)

同样的,类似 ChannelInBoundHandler ,在使用 write 向通道写入数据,在消费完该数据后,需要注意释放该资源对象的引用计数器,同样可以直接使用 SimpleChannelOutBoundHandler ,可以自动释放资源的引用计数;





ChannelPipeline


ChannelPipeline 是 ChannelHandler 实例的容器,流经一个 Channel 的入站和出站事件可以被 ChannelPipeline 拦截,ChannelPipeline能够让用户自己对入站/出站事件的处理逻辑,以及pipeline里的各个Handler之间的交互进行定义。

每当一个新的 Channel 被创建了,都会建立一个新的 ChannelPipeline,并且这个新的 ChannelPipeline 还会绑定到 Channel 上,这个关联是永久性的;

根据它的起源,一个事件将由 ChannelInboundHandler 或 ChannelOutboundHandler 处理。随后它将调用 ChannelHandlerContext 实现转发到下一个相同的超类型的处理程序。

Netty(4):核心部件: ChannelPipeline & ChannelHandler 处理链


ChannelPipeline 操纵 ChannelHandler

可以实时修改 ChannelPipeline 的布局,通过添加、移除、替换其中的 ChannelHandler;
addFirst([name:String,] ChannelHandler) 向 ChannelPipeline 头部添加 ChannelHandler;
addLast([name:String,] ChannelHandler) 向 ChannelPipeline 尾部添加 ChannelHandler;
addBefore (baseName:String,name:String, ChannelHandler) 向指定的 ChannelHandler 之前添加 ChannelHandler;
addAfter (baseName:String,name:String,ChannelHandler) 向指定的 ChannelHandler 之后添加 ChannelHandler;
remove(name:String / ChannelHandler) 删除指定 ChannelHandler;
replace(old:String,new:String,ChannelHandler) 替换指定 ChannelHandler;

以下的 API 用于查看 ChannelPipeline 中的 ChannelHandler :
get(name:String) 获取指定的 ChannelHandler;
List<String> names() 获取 ChannelPipeline 的所有 ChannelHandler 列表
iterator() 获取 ChannelPipeline 的所有 ChannelHandler 的迭代器
context(name:String / ChannelHandler) 获取指定 ChannelHandler 的 ChannelHandlerContext

示例:
 
ChannelPipeline pipeline = null; 
FirstHandler firstHandler = new FirstHandler(); 
pipeline.addLast("handler1", firstHandler); 
pipeline.addFirst("handler2", new SecondHandler()); 
pipeline.addAfter("handler2","handler3", new ThirdHandler()); 
pipeline.remove("handler3"); 
pipeline.remove(firstHandler); 
pipeline.replace("handler2", "handler4", new ForthHandler()); 

ChannelPipeline 向 Channel 发送事件

ChannelPipeline API 有额外调用入站和出站操作的方法;
入站操作 API,用于向 ChannelPipeline 中 ChannelInboundHandlers 通知正在发生的事件:
fireChannelRegistered() 向下一个 ChannelInboundHandler 通知 ChannelRegistered 事件;
fireChannelUnregistered() 向下一个 ChannelInboundHandler 通知 ChannelUnregistered 事件;
fireChannelActive() 向下一个 ChannelInboundHandler 通知 ChannelActive 事件;
fireChannelInactive() 向下一个 ChannelInboundHandler 通知 ChannelInactive 事件;
fireExceptionCaught() 向下一个 ChannelInboundHandler 通知 ExceptionCaught事件;
fireUserEventTriggered() 向下一个 ChannelInboundHandler 通知 UserEventTriggered事件;
fireChannelRead(msg:Object) 向下一个 ChannelInboundHandler 通知 ChannelRead 事件;
fireChannelReadComplete() 向下一个 ChannelInboundHandler 通知 ChannelReadComplete 事件;

出站操作 API,用于向 ChannelPipeline 中 ChannelOutboundHandlers 通知正在发生的事件:
bind(local:SocketAddress , ChannelPromise ) 向下一个 ChannelOutboundHandler 发送 bind 事件
connect(local:SocketAddress,remoteLSocketAddress[,ChannelPromise]) 向下一个 ChannelOutboundHandler 发送 connect 事件
disconnect([ChannelPromise]) 向下一个 ChannelOutboundHandler 发送 disconnect 事件
close() 向下一个 ChannelOutboundHandler 发送 close 事件
deregister() 向下一个 ChannelOutboundHandler 发送 disregister 事件
flush() 向下一个 ChannelOutboundHandler 发送 flush 事件
write(msg:Object [,ChannelPromise]) 向下一个 ChannelOutboundHandler 发送 write 事件
writeAndFlush(msg:Object [,ChannelPromise]) 向下一个 ChannelOutboundHandler 发送 write 和 flush 事件
read() 向下一个 ChannelOutboundHandler 发送 read 事件




ChannelHandlerContext


在ChannelHandler 添加到 ChannelPipeline 时会创建一个实例,即 ChannelHandlerContext,它代表了 ChannelHandler 和ChannelPipeline 之间的关联。接口ChannelHandlerContext 主要是对通过同一个 ChannelPipeline 关联的 ChannelHandler 之间的交互进行管理;
Netty(4):核心部件: ChannelPipeline & ChannelHandler 处理链
① Channel 绑定到 ChannelPipeline;
② ChannelPipeline 绑定到 包含 ChannelHandler 的 Channel;
③ ChannelHandler;
④ 当添加 ChannelHandler 到 ChannelPipeline 时,ChannelHandlerContext 被创建;

ChannelHandlerContext 中包含了有许多方法,其中一些方法也出现在 Channel 和ChannelPipeline 本身(如 bind、connect、register、fireChannelRead 等)。如果您通过Channel 或ChannelPipeline 的实例来调用这些方法,他们就会在整个 pipeline中传播 。相比之下,一样的方法在 ChannelHandlerContext 的实例上调用, 就只会从当前的 ChannelHandler 开始并传播到相关管道中的下一个有处理事件能力的 ChannelHandler ;

ChannelHandlerContext 与 ChannelHandler 的关联从不改变,所以缓存它的引用是安全的。
ChannelHandlerContext 所包含的事件流比其他类中同样的方法都要短,利用这一点可以尽可能高地提高性能。

从 ChannelHandlerContext 获取 Channel、ChannelPipeline 并传播事件

从 ChannelHandlerContext 获取 Channel,并写入数据:
 
ChannelHandlerContext ctx = context;
Channel channel = ctx.channel(); 
channel.write(Unpooled.copiesBuffer("Hello World!", CharsetUtil.UTF_8));  
从 ChannelHandlerContext 获取 ChannelPipeline,并写入数据:
 
ChannelHandlerContext ctx = context;
ChannelPipeline pipeline = ctx.pipeline(); 
pipeline.write(Unpooled.copiedBuffer("Hello World!", CharsetUtil.UTF_8));  
以上两个例子中,虽然在 Channel 、ChannelPipeline 上调用write() 都会把事件在整个管道传播,但是在 ChannelHandler 级别上,从一个处理程序转到下一个却要通过在 ChannelHandlerContext 调用方法实现,如下:
Netty(4):核心部件: ChannelPipeline & ChannelHandler 处理链
① 事件传递给 ChannelPipeline 的第一个 ChannelHandler
② ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的 下一个
③ ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的 下一个

指定某个事件由特定的 ChannelHandler 处理

如果实现从一个特定的 ChannelHandler 开始处理事件,必须引用当前 ChannelHandler 的前一个 ChannelHandler 关联的 ChannelHandlerContext ,这样才能使得事件从当前的 ChannelHanlder 开始处理,如下:
如果要由 ChannelHandler3 来处理写入事件,需要获取 ChannelHandler2 的ChannelHandlerContext 写入消息:
Netty(4):核心部件: ChannelPipeline & ChannelHandler 处理链
以下是一种实现方式,在 ChannelHandler2 中储存自身的 ChannelHandlerContext,并提供一个 ChannelHandlerContext 的外部调用方法(在本例中为发送信息);
 
@ChannelHandler.Sharable 
public class  ChannelHandler2 extends ChannelHandlerAdapter {
    private ChannelHandlerContext ctx;
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        this.ctx = ctx;        //储存ChannelHandler 本身的 ChannelHandlerContext
    }
    public void send(String msg) {
        ctx.writeAndFlush(msg);  //提供自身 ChannelHandlerContext 的外部调用方法
    }
}
之后在任意地方只要获取/创建 ChannelHandler2 的实例,并调用其 send 方法,由该方法写入的 msg 会向后面的 ChannelHandler 传播,并会被 ChannelHandler3 的 channelRead() 方法所捕获;

在 ChannelHandlerContext 之间传递 channelRead 的 msg 消息对象

有一种常见的应用场景,在某个 ChannelHandler 获取到写入事件的消息后,需要由后面的 ChannelHandler 对该消息进行进一步的处理,典型的应用为日志处理器;
如下示例中,Handler1 处理完写入消息后,需要将该消息传递给 Handler2 进行日志处理,(假设处理器顺序为 Handler1 -> Handler2)可以如下实现:
 
@ChannelHandler.Sharable 
public class Handler1 extends ChannelInboundHandlerAdapter {
   @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Handle data " + in.toString(CharsetUtil.UTF_8));  //处理写入数据
        ctx.fireChannelRead(msg);                  //传递一个 channelRead 事件给接下来的 ChannelHandler
    }
}
 
@ChannelHandler.Sharable 
public class Handler2 extends ChannelInboundHandlerAdapter {
   @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("log data " + in.toString(CharsetUtil.UTF_8));  //将获取的数据写入日志
    }
}

※以上例子中使用 @Sharable 表明该 ChannelHandler 的实例在 Channel 之间是可以共享的,即多个 ChannelPipeline 可以并发访问该 ChannelHandler 实例,如果该 ChannelHandler 中含有持有状态的代码,不建议使用该注解,否则需要做好代码的状态线程安全;