Netty 源码分析 06 ChannelPipeline
2.1 ChannelInboundInvoker
io.netty.channel.ChannelInboundInvoker
,Channel Inbound Invoker( 调用者 ) 接口
2.2 ChannelOutboundInvoker
io.netty.channel.ChannelOutboundInvoker
,Channel Outbound Invoker( 调用者 ) 接口
2.3 Outbound v.s Inbound 事件
在 《Netty 源码分析之 二 贯穿Netty 的大动脉 ── ChannelPipeline (二)》 中,笔者看到一个比较不错的总结
对于 Outbound 事件:
- Outbound 事件是【请求】事件(由 Connect 发起一个请求, 并最终由 Unsafe 处理这个请求)
- Outbound 事件的发起者是 Channel
- Outbound 事件的处理者是 Unsafe
-
Outbound 事件在 Pipeline 中的传输方向是
tail
->head
在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Handler, 则需要调用 ctx.xxx
(例如 ctx.connect
) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.
对于 Inbound 事件:
- Inbound 事件是【通知】事件, 当某件事情已经就绪后, 通知上层.
- Inbound 事件发起者是 Unsafe
- Inbound 事件的处理者是 TailContext, 如果用户没有实现自定义的处理方法, 那么Inbound 事件默认的处理者是 TailContext, 并且其处理方法是空实现.
- Inbound 事件在 Pipeline 中传输方向是
head
( 头 ) ->tail
( 尾 )
3. DefaultChannelPipeline
io.netty.channel.DefaultChannelPipeline
,实现 ChannelPipeline 接口,默认 ChannelPipeline 实现类。???? 实际上,也只有这个实现类
nameCaches
静态属性,名字( AbstractChannelHandlerContext.name
)缓存 ,基于 ThreadLocal ,用于生成在线程中唯一的名字。
pipeline 中的节点的数据结构是 ChannelHandlerContext 类。每个 ChannelHandlerContext 包含一个 ChannelHandler、它的上下节点( 从而形成 ChannelHandler 链 )、以及其他上下文。
默认情况下,pipeline 有 head
和 tail
节点,形成默认的 ChannelHandler 链。而我们可以在它们之间,加入自定义的 ChannelHandler 节点
4. ChannelHandlerContext
io.netty.channel.ChannelHandlerContext
,继承 ChannelInboundInvoker、ChannelOutboundInvoker、AttributeMap 接口,ChannelHandler Context( 上下文 )接口,作为 ChannelPipeline 中的节点
4.1 AbstractChannelHandlerContext
io.netty.channel.AbstractChannelHandlerContext
,实现 ChannelHandlerContext、ResourceLeakHint 接口,继承 DefaultAttributeMap 类,ChannelHandlerContext 抽象基类
4.1.3 setAddComplete
#setAddComplete()
方法,设置 ChannelHandler 添加完成。完成后,状态有两种结果:
REMOVE_COMPLETE
ADD_COMPLETE
4.2 HeadContext
HeadContext ,实现 ChannelOutboundHandler、ChannelInboundHandler 接口,继承 AbstractChannelHandlerContext 抽象类,pipe 头节点 Context 实现类。
4.2.2 handler
#handler()
方法,返回自己作为 Context 的 ChannelHandler
4.3 TailContext
TailContext ,实现 ChannelInboundHandler 接口,继承 AbstractChannelHandlerContext 抽象类,pipe 尾节点 Context 实现类。
- 不同于 HeadContext、TailContext,它们自身就是一个 Context 的同时,也是一个 ChannelHandler 。而 DefaultChannelHandlerContext 是内嵌 一个 ChannelHandler 对象,即
handler
。这个属性通过构造方法传入,在<2>
处进行赋值。 -
<1>
处,调用父 AbstractChannelHandlerContext 的构造方法,通过判断传入的handler
是否为 ChannelInboundHandler 和 ChannelOutboundHandler 来分别判断是否为inbound
和outbound
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
ChannelPipeline(二)之添加 ChannelHandler
2. addLast
#addLast(ChannelHandler... handlers)
方法,添加任意数量的 ChannelHandler 对象
#addLast(EventExecutorGroup group, String name, ChannelHandler handler)
方法,添加一个 ChannelHandler 对象到 pipeline 中
synchronized
同步,为了防止多线程并发操作 pipeline 底层的双向链表。
3. checkMultiplicity
#checkMultiplicity(ChannelHandler handler)
方法,校验是否重复的 ChannelHandler
在 pipeline 中,一个创建的 ChannelHandler 对象,如果不使用 Netty @Sharable
注解,则只能添加到一个 Channel 的 pipeline 中。所以,如果我们想要重用一个 ChannelHandler 对象( 例如在 Spring 环境中 ),则必须给这个 ChannelHandler 添加 @Sharable
注解。
4.1 generateName
#generateName(ChannelHandler)
方法,根据 ChannelHandler 生成一个唯一名字
5. newContext
#newContext(EventExecutorGroup group, String name, ChannelHandler handler)
方法,创建 DefaultChannelHandlerContext 节点。而这个节点,内嵌传入的 ChannelHandler 参数
一共有三种情况:
-
<1>
,当不传入 EventExecutorGroup 时,不创建子执行器。即,使用 Channel 所注册的 EventLoop 作为执行器。对于我们日常使用,基本完全都是这种情况。 -
<2>
,根据配置项ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP
,每个 Channel 从 EventExecutorGroup 获得不同 EventExecutor 执行器。 -
<3>
,通过childExecutors
缓存实现,每个 Channel 从 EventExecutorGroup 获得相同 EventExecutor 执行器。是否获得相同的 EventExecutor 执行器,这就是<2>
、<3>
的不同
6. addLast0
#addLast0(AbstractChannelHandlerContext newCtx)
方法,添加到最后一个节点。注意,实际上,是添加到 tail
节点之前
7. callHandlerAdded0
#callHandlerAdded0(AbstractChannelHandlerContext)
方法,执行回调 ChannelHandler 添加完成( added )事件
为什么会有 PendingHandlerCallback 呢?
因为 ChannelHandler 添加到 pipeline 中,会触发 ChannelHandler 的添加完成( added )事件,并且该事件需要在 Channel 所属的 EventLoop 中执行。
但是 Channel 并未注册在 EventLoop 上时,需要暂时将“触发 ChannelHandler 的添加完成( added )事件”的逻辑,作为一个 PendingHandlerCallback 进行“缓存”。在 Channel 注册到 EventLoop 上时,进行回调执行
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
ChannelPipeline(三)之移除 ChannelHandler
2. remove
#remove(ChannelHandler handler)
方法,从 pipeline 移除指定的 ChannelHandler 对象
synchronized
同步,为了防止多线程并发操作 pipeline 底层的双向链表。
- 第 6 行:调用
#remove0(AbstractChannelHandlerContext ctx)
方法,从 pipeline 移除指定的 AbstractChannelHandlerContext 节点
4. callHandlerRemoved0
#callHandlerRemoved0(AbstractChannelHandlerContext)
方法,执行回调 ChannelHandler 移除完成( removed )事件
5. PendingHandlerRemovedTask
PendingHandlerRemovedTask 实现 PendingHandlerCallback 抽象类,用于回调移除 ChannelHandler 节点
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
ChannelPipeline(四)之 Outbound 事件的传播
Outbound 事件是【请求】事件(由 Channel 发起一个请求, 并最终由 Unsafe 处理这个请求)
从 tail 传递到 head
调用 AbstractChannelHandlerContext#executor()
方法,获得下一个 Outbound 节点的执行器
如果未设置子执行器,则使用 Channel 的 EventLoop 作为执行器。???? 一般情况下,我们可以忽略子执行器的逻辑,也就是说,可以直接认为是使用 Channel 的 EventLoop 作为执行器。
本文暂时只分享了 bind 这个 Outbound 事件。剩余的其他事件,胖友可以自己进行调试和理解。例如:connect 事件
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
ChannelPipeline(五)之 Inbound 事件的传播
Inbound 事件是【通知】事件, 当某件事情已经就绪后, 通知上层.
Inbound 事件发起者是 Unsafe
Inbound 事件的处理者是 TailContext, 如果用户没有实现自定义的处理方法, 那么Inbound 事件默认的处理者是 TailContext, 并且其处理方法是空实现
Inbound 事件在 Pipeline 中传输方向是 head
( 头 ) -> tail
( 尾 )
在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Handler, 则需要调用 ctx.fireIN_EVT
(例如 ctx.fireChannelActive
) 将此事件继续传播下去。
2. ChannelInboundInvoker
我们可以看到 Inbound 事件的其中之一 fireChannelActive ,本文就以 fireChannelActive 的过程
3. DefaultChannelPipeline
DefaultChannelPipeline#fireChannelActive()
方法的实现
4. AbstractChannelHandlerContext#invokeChannelActive
AbstractChannelHandlerContext#invokeChannelActive(final AbstractChannelHandlerContext next)
静态方法,
7. TailContext
在方法内部,会调用 DefaultChannelPipeline#onUnhandledInboundChannelActive()
方法
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
ChannelPipeline(六)之异常事件的传播
2. notifyOutboundHandlerException
我们以 Outbound 事件中的 bind 举例子,代码如下:
AbstractChannelHandlerContext#notifyOutboundHandlerException(Throwable cause, ChannelPromise promise)
方法,通知 Outbound 事件的传播,发生异常
在方法内部,会调用 PromiseNotificationUtil#tryFailure(Promise<?> p, Throwable cause, InternalLogger logger)
方法,通知 bind 事件对应的 Promise 对应的监听者们
3. notifyHandlerException
我们以 Inbound 事件中的 fireChannelActive 举例子
如果 Exception Caught 事件在 pipeline 中的传播过程中,一直没有处理掉该异常的节点,最终会到达尾节点 tail