Netty 源码分析之 二 事件流在ChannlePipeline中的流转
channelPipeline是一个接口,其实现类为DefaultChannelPipeline;通常像pipeline中添加channelHandler时最终都会调用以下方法:
@Override
public ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
}
return this;
}
将handler与其关联的name,如果name为空,会生成一个handler classNmae + "#0” 形式;最终封装成一个双链表的DefaultChannelHandlerContext节点,添加到pipeline中。在DefaultChannelHandlerContext有 inbound 和 outbound 两个 boolean 变量,private final boolean inbound; private final boolean outbound;代表着不同的事件,而不同的事件对应的事件流向是不相同的;
从netty api中可以看出outBound对应的是stock的写入事件,其事件的流转是从上完下进行的;即首先由Channel发送某种事件数据流,经过一系列的Handler后,通过最后的handler 将事件写出去。这里比如客户端的连接事件或者服务端的bind事件;而inbound事件对应的stock的读取事件,其事件流转的途径是从下往上;即首先从TCP缓存区中读取数据,然后经过不同的Handler进行处理;这里对应IO流监听的读取事件;
Inbound 事件传播方法有:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
Oubound 事件传输方法有:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
注意, 如果我们捕获了一个事件, 并且想让这个事件继续传递下去, 那么需要调用 Context 相应的传播方法.
例如:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}
1 Outbound 事件传播操作;
以客户端connect事件为例:看代码,客户端在发起连接时,会经过一段以下代码:
首先是tail节点,也就是尾节点调用connect方法,在findContextOutbound方法中,查找outbound属性为真的节点,也就是channelOutboundHandler类型的;也就是说outbound事件是有tail开始传播,执行的过程是从上往下执行的;
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise);
}
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validatePromise(promise, false);
final DefaultChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
private DefaultChannelHandlerContext findContextOutbound() {
DefaultChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
如果没有handler重写connect方法时,最终将会由headHandler去执行connect,操作,因为headHandler节点是outbound为真的节点;
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
最终还是交给底层工具类unsafe去执行连接操作;
2 inbound 事件传播操作
以上面的连接操作继续讲解,outbound与inbound事件如何联系起来,形成一个环形事件的;当unsafe执行connect方法时,其实是调用其实现类,AbstractNioUnsafe.connect方法;在doConnect中,如果连接成功会执行fulfillConnectPromise方法;接着pipeline().fireChannelActive();而fireChannelActive方法是pileline**下一个handler的入口方法;
@Override
public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
promise.tryFailure(t);
closeIfClosed();
}
}
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
return;
}
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
}
}
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
public ChannelHandlerContext fireChannelActive() {
final DefaultChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
return this;
}
private DefaultChannelHandlerContext findContextInbound() {
DefaultChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
private void invokeChannelActive() {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
而fireChannelActive,首先调用的是headHandler 节点的方法,fireChannelActive方法;而该方法中的findContextInbound是从头节点开始查找第一个inbound属性为true的节点;然后调用该headler的channelActive方法;因此inbound事件是有head开始传播,事件流从上往下执行,逐个调用其channelActive方法,通过可以在该方法中,通过调用fireChannelActive方法来触发下一个handler;如果没有自定义类headler,那么inbound事件最终流转到tailHandler,而其channelActive就是个空方法,那么该事件最终无法处理就结束了。
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
总结
对于 Outbound事件:
-
Outbound 事件是请求事件(由 Connect 发起一个请求, 并最终由 unsafe 处理这个请求)
-
Outbound 事件的发起者是 Channel
-
Outbound 事件的处理者是 unsafe
-
Outbound 事件在 Pipeline 中的传输方向是 tail -> head.
-
在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Hnalder, 则需要调用 ctx.xxx (例如 ctx.connect) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.
-
Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT
对于 Inbound 事件:
-
Inbound 事件是通知事件, 当某件事情已经就绪后, 通知上层.
-
Inbound 事件发起者是 unsafe
-
Inbound 事件的处理者是 Channel, 如果用户没有实现自定义的处理方法, 那么Inbound 事件默认的处理者是 TailContext, 并且其处理方法是空实现.
-
Inbound 事件在 Pipeline 中传输方向是 head -> tail
-
在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Hnalder, 则需要调用 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.
-
Outbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT