Netty源码分析(四)之Pipeline
文章目录
netty版本
- 使用的netty版本是
io.netty:netty-all:4.1.33.Final
Pipeline案例
-
Server代码
public final class PipelineServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(8080)) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new InBoundHandler1()); pipeline.addLast(new InBoundHandler2()); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
-
Handler代码
public class InBoundHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channelActive 1"); } @Override public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("channelRegistered 1"); } @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("handlerAdded 1"); } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead 1"); } } public class InBoundHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channelActive 2"); } @Override public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("channelRegistered 2"); } @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("handlerAdded 2"); } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead 2"); } }
ChannelHandlerContext
-
类声明中
ChannelHandlerContext
实现三个接口,即ChannelHandlerContext
具备三种功能,存储自定义属性、传播读事件和传播写事件public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker{ Channel channel(); EventExecutor executor(); String name(); ...省略... ChannelHandlerContext fireXXXX(Object evt) ...省略... ChannelHandler handler(); ChannelPipeline pipeline(); ByteBufAllocator alloc(); }
-
UML图可以看出,
ChannelHandlerContext
的主要子类是抽象类AbstractChannelHandlerContext
-
AbstractChannelHandlerContext
核心属性可以看出ChannelHandlerContext
是关联一个ChannelPipeline
,AbstractChannelHandlerContext
是一个链表结构,有前驱节点和后继节点,前后节点的维护是在ChannelPipeline
中维护,后面分析。//后继节点 volatile AbstractChannelHandlerContext next; //前驱节点 volatile AbstractChannelHandlerContext prev; //ChannelHandler#handlerAdded调用前 private static final int ADD_PENDING = 1; //ChannelHandler#handlerAdded调用后 private static final int ADD_COMPLETE = 2; //ChannelHandler#handlerRemoved调用 private static final int REMOVE_COMPLETE = 3; private final boolean inbound; private final boolean outbound; private final DefaultChannelPipeline pipeline;
-
默认实现类
DefaultChannelHandlerContext
,isInbound()
和isOutbound()
可以看出Netty是通过instanceof
来判断当前Handler是入站还是出站final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext { private final ChannelHandler handler; DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) { super(pipeline, executor, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; } @Override public ChannelHandler handler() { return handler; } private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; } }
ChannelPipeline创建
-
在前面的Channel的创建过程分析中可以看出,在
Channel
创建的时候会创建一个默认的ChannelPipeline
(DefaultChannelPipeline
实例) -
DefaultChannelPipeline
的构造方法,ChannelPipeline
创建过程中会创建一个TailContext
和HeadContext
,并且会修改TailContext
和HeadContext
的前驱指针(tail.prev
)和后继指针(head.next
)的指向,即ChannelPipeline
初始状态如下图所示volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); //链表指针指向 head.next = tail; tail.prev = head; }
-
TailContext
的创建过程:从类声明以及构造方法可以看出TailContext
是一个ChannelInboundHandler
,同时还是一个ChannelHandlerContext
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); //标识已经添加完成 setAddComplete(); } ...省略... } //父类构造 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; }
-
HeadContext
的创建:从以下类声明以及构造方法可以看出HeadContext
既是一个ChannelInboundHandler
又是一个ChannelOutboundHandler
,同时还是一个ChannelHandlerContext
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, true, true); unsafe = pipeline.channel().unsafe(); //标识已经添加完成 setAddComplete(); } ...省略... }
添加ChannelHandler
-
ChannelPipeline
添加Handler的主要方法是addLast
,我们在代码中一般这样写new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new InBoundHandler1()); pipeline.addLast(new InBoundHandler2()); } });
查看DefaultChannelPipeline
的addLast
方法
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//检查ChannelHandler是否重复添加
checkMultiplicity(handler);
//创建`ChannelHandlerContext`节点,
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
//设置Context为ADD_PENDING状态
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
第一步:判断ChannelHandler
是否重复添加,判断ChannelHandler
是否是共享的(@Sharable
),所有的ChannelHandlerAdapter
都有一个属性boolean added
,表示是否被添加,默认为false
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
//改变Handler的added属性为true
h.added = true;
}
}
第二步:创建一个ChannelHandlerContext
节点,并添加ChannelHandlerContext
节点到链表
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
//this就是当前DefaultChannelPipeline
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
![调用addLast后](https://gitee.com/jannal/images/raw/master/netty-source/15508492134621.jpg)
第三步:回调Handler处理器中的handlerAdded()
方法(添加完成事件)。在Netty低版本中是先回调handlerAdded
后设置ADD_COMPLETE
(添加完成状态),这样会有问题,如果handlerAdded
方法生成任何Pipeline事件,将无法设置完成状态。所以在高版本中时先设置状态后回调方法。
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
remove0(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
if (setAddComplete()) {
//此处回调的就是childHandler中添加的ChannelInitializer的handlerAdded方法
handler().handlerAdded(this);
}
}
ctx.callHandlerAdded()
就是我们最前面案例childHandler
代码中添加的ChannelInitializer
的handlerAdded()
方法
删除Handler
-
什么情况下我们会删除动态删除Handler呢?比如长连接的鉴权操作,一旦鉴权成功,我们可以把鉴权的Handler删除掉,避免每次都执行
public class AuthTokenHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { long password = msg.readLong(); if (password == 1L) { ctx.pipeline().remove(this); } else { ctx.close(); } } }
-
查看
DefaultChannelPipeline
的remove
方法@Override public final ChannelPipeline remove(ChannelHandler handler) { remove(getContextOrDie(handler)); return this; } private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) { assert ctx != head && ctx != tail; synchronized (this) { remove0(ctx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we remove the context from the pipeline and add a task that will call // ChannelHandler.handlerRemoved(...) once the channel is registered. if (!registered) { callHandlerCallbackLater(ctx, false); return ctx; } EventExecutor executor = ctx.executor(); if (!executor.inEventLoop()) { executor.execute(new Runnable() { @Override public void run() { callHandlerRemoved0(ctx); } }); return ctx; } } callHandlerRemoved0(ctx); return ctx; } private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) { // Notify the complete removal. try { ctx.callHandlerRemoved(); } catch (Throwable t) { fireExceptionCaught(new ChannelPipelineException( ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t)); } } final void callHandlerRemoved() throws Exception { try { // Only call handlerRemoved(...) if we called handlerAdded(...) before. if (handlerState == ADD_COMPLETE) { handler().handlerRemoved(this); } } finally { // Mark the handler as removed in any case. setRemoved(); } }
第一步: 遍历ChannelHandlerContext
列表找到要删除的Handler
```java
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
//从head节点的下一个节点开始遍历
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
```
第二步:调用同步方法,更改节点的前驱和后继
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
第三步:回调handlerRemoved()
方法
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
ctx.callHandlerRemoved();
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
final void callHandlerRemoved() throws Exception {
try {
if (handlerState == ADD_COMPLETE) {
//回调Handler的handlerRemoved方法
handler().handlerRemoved(this);
}
} finally {
// 设置REMOVE_COMPLETE状态
setRemoved();
}
}
ChannelInitializer
-
从下面的类声明中可以看出
ChannelInitializer
是一个Sharable
(共享的)的InBoundHandler
(入站处理器)@Sharable public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter
-
从上面的分析可以知道,用户自定义的Handler是通过
handlerAdded
方法添加到Pipeline,并且添加之后移除当前ChannelInitializer
所在的ChannelHandlerContext
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); //移除当前ChannelInitializer,即移除自身 if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false; } //抽象方法,我们自己添加自定义的ChannelHandler protected abstract void initChannel(C ch) throws Exception;
-
ChannelInitializer
实现了ChannelHandler
,那么它是在什么时候添加到ChannelPipeline
中的呢?接下来分析
ChannelInitializer添加过程
服务端
-
服务端bind时,
ServerBootstrap
在init()方法中,会给ServerSocketChannel
的pipeline
添加ChannelInitializer
对象p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });