Netty 源码阅读 —— 服务端创建
之前项目中用过netty,这次趁着面试空闲时间,重新梳理一遍netty源码,从服务端创建开始,话不多说,直接上代码
先看看netty服务端创建的整体代码,大概如下所示:
public void bind(int port) throws Exception { EventLoopGroup workGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,workGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,24) .childHandler(new ChildChannelHandler()); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { System.out.println("server initChannel.."); socketChannel.pipeline().addLast(new ServerHandler()); } }
ok , 我们先来看看new NioEventLoopGroup() 的过程中发生了什么:
public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) { this(nThreads, (ThreadFactory)null);} public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(nThreads, threadFactory, new Object[]{selectorProvider}); }可以看到首先通过层层调用后 最终调用了这一个方法,其实最后是调用了父类的构造方法,传入了三个参数,第一个是线程数,第二个 threadFactory 为空,第三个是一个SelectorProvider,顾名思义其实就是一个Selector提供类,我们可以看一下这个类的源码,果然不出所料,如果看到这里你还不熟悉的话,建议先去看看java NIO 的知识。
super(nThreads, threadFactory, new Object[]{selectorProvider});
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
public abstract AbstractSelector openSelector()
throws IOException;
public abstract ServerSocketChannel openServerSocketChannel()
throws IOException;
public abstract SocketChannel openSocketChannel()
throws IOException;
ok,刚刚说到通过super 关键字调用父类的构造方法,NioEventLoopGroup 父类是 MultithreadEventLoopGroup ,那我们来看看父类构造方法的实现
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0?DEFAULT_EVENT_LOOP_THREADS:nThreads, threadFactory, args); }
发现MultithreadEventLoopGroup这个类又调用了它父类的构造方法,ok,层层往上找 ,找到MultithreadEventLoopGroup 的父类 MultithreadEventExecutorGroup,构造方法如下:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (threadFactory == null) { threadFactory = newDefaultThreadFactory(); } children = new SingleThreadEventExecutor[nThreads]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(threadFactory, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } } final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
}这里是不是有点熟悉,回顾一下netty 的原理,这段代码就是netty reactor 模式的实现。chooser 用来随机选择一个child 线程执行。children 即为工作线程,类型为SingleThreadEventExecutor,我们来看看这个类的源码
通过这个类的方法我们可以看到,SingleThreadEventExecutor这个类就是具体的task 执行类了。是不是豁然开朗
接着我们继续看一下客户端的连接过程
还记得 MultithreadEventExecutorGroup 类构造的时候有一行 这个代码吗?
children[i] = newChild(threadFactory, args);
我们看一下它发生了什么
@Override protected EventExecutor newChild( ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]); }
public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); }
可以看到 这行代码创建了一个 一个NioEventLoop , 这个对象里面 打开了一个 selector, 客户端的连接动作基本都是由这个类管理。我们看一下run() 方法:
protected void run() { for (;;) { boolean oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(oldWakenUp); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }
这段代码逻辑大概是,先判断一下当前有没有任务需要执行,如果有任务,则执行任务下发逻辑,我们可以看一下processSelectedKeys() 这个方法
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 for (;;) { if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; i++; } selectAgain(); // Need to flip the optimized selectedKeys to get the right reference to the array // and reset the index to -1 which will then set to 0 on the for loop // to start over again. // // See https://github.com/netty/netty/issues/1523 selectedKeys = this.selectedKeys.flip(); i = -1; } } }
我们看一下 processSelectedKey 这个方法做了什么操作,如下
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
我们可以看到,当注册对象是 SelectionKey.OP_READ 这个时,执行unsafe.read() 操作,我们看看这个操作干了什么事情,我们发现这是一个抽象方法,看一下NioMessageUnsafe 这个子类的实现
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; } final int maxMessagesPerRead = config.getMaxMessagesPerRead(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; try { try { for (;;) { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } // stop reading and remove op if (!config.isAutoRead()) { break; } if (readBuf.size() >= maxMessagesPerRead) { break; } } } catch (Throwable t) { exception = t; } setReadPending(false); int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); pipeline.fireChannelReadComplete(); if (exception != null) { if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) { // ServerChannel should not be closed even on IOException because it can often continue // accepting incoming connections. (e.g. too many open files) closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); } pipeline.fireExceptionCaught(exception); } if (closed) { if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } }
我们看一下 这行代码 int localRead = doReadMessages(readBuf);
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
其实就是 接收客户端连接的逻辑。
final ChannelPipeline pipeline = pipeline(); 这一行代码看一下,我们发现,
@Override public ChannelPipeline pipeline() { return pipeline; }
private final ChannelPipeline pipeline;
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class); static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException(); static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException(); static { CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); } private MessageSizeEstimator.Handle estimatorHandle; private final Channel parent; private final long hashCode = ThreadLocalRandom.current().nextLong(); private final Unsafe unsafe; private final ChannelPipeline pipeline; private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null); private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true); private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false); private final CloseFuture closeFuture = new CloseFuture(this); private volatile SocketAddress localAddress; private volatile SocketAddress remoteAddress; private volatile EventLoop eventLoop; private volatile boolean registered; /** Cache for the string representation of this channel */ private boolean strValActive; private String strVal;
我们发现了 pipeline 这个对象是final ,而且 一个Channel 封装了 很多全局对象,这些对象都是全局不可更改的。看到这个对象,是不是对Netty 的设计有了更多的一些感悟呢
拿到了 pipeline 之后,执行了pipeline.fireChannelRead(readBuf.get(i)); 这一行代码,我们深入看一下
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { if (msg == null) { throw new NullPointerException("msg"); } final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRead(msg); } }); } return this; }
然后我们发现,next.invokeChannelRead(msg); 这里回调了 channelRead 方法 , 所以你明白为什么 netty 建立连接之后会回调channelRead 方法了吧
private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } }
如果你跟我一样,一直对如何回调这个逻辑比较好奇,那你可能会有额外发现,我们跟踪一下这一行代码
final AbstractChannelHandlerContext next = findContextInbound();
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
我们突然发现了ChannelHandlerContext 的设计,是一个双向链表。也就是说netty是通过一个双向链表来实现通信过程中上下文管理的。这里你是不是又想到了linkedlist 呢
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; private final boolean inbound; private final boolean outbound; private final AbstractChannel channel; private final DefaultChannelPipeline pipeline; private final String name; private boolean removed; // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. final EventExecutor executor; private ChannelFuture succeededFuture; // Lazily instantiated tasks used to trigger events to a handler with different executor. // These needs to be volatile as otherwise an other Thread may see an half initialized instance. // See the JMM for more details private volatile Runnable invokeChannelReadCompleteTask; private volatile Runnable invokeReadTask; private volatile Runnable invokeChannelWritableStateChangedTask; private volatile Runnable invokeFlushTask;
ok,回到我们的channelRead 方法,这里执行的是ServerBootstrapAdapter 的 channelRead 方法,我们可以看到这里完成了 添加 childHandler ,设置客户端参数,以及注册到多路复用器的逻辑。到这里,整个连接过程清晰无疑。
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }