Netty源码简析之客户端新连接接入

总结:
Netty新连接注册流程:
Netty在服务端绑定NioEventLop,并轮询到accept事件,服务端调用jdk底层accept()方法获取客户端channel,并且封装成客户端的NioSocketChannel。并且创建一系列的组件,如unsafe(读写)、pipeline(数据、业务处理)。服务端通过ServerBootStrapAcceptorg给当前客户端分配channel,并绑定到唯一的selector上。最后通过传播(pipeline.fireChannelActive())将客户端channel向selecto注册读事件
 
举一个Netty服务端创建例子如下:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
            .handler(new ServerHandler())
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new AuthHandler());
                    //.................
                }
            });
 
 
    ChannelFuture f = b.bind(8888).sync();
 
    f.channel().closeFuture().sync();
} finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}
 
首先来看一下netty是如何检测新连接的(会扫描并处理非阻塞)
首先我们看回上文的《NIOEventLoop解析》,连接为 https://blog.csdn.net/qq_28666081/article/details/102540045   processSelectedKey()  (源码位置:NioEventLoop#processSelectedKey)处理IO事件的方法
首先看读事件的收取
// NioEventLoop.java
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {
        // Connection already closed - no need to handle write.
        return;
    }
}
 
继续往下跟踪,可定位到对应的消息读取(服务端在NioMessageUnsafe,客户端在NioByteUnsafe),里面写了一个while用于检测
// NioMessageUnsafe
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);
 
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            // 轮询获取
            do {
                 // 从readBuf读取数据
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                // 累加消息读取的数量
                allocHandle.incMessagesRead(localRead);
                // 是否可继续读
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
 
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
 
        if (exception != null) {
            closed = closeOnReadError(exception);
 
        pipeline.fireExceptionCaught(exception);
    }
 
    if (closed) {
        inputShutdown = true;
        if (isOpen()) {
            close(voidPromise());
        }
    }
} finally {
    // Check if there is a readPending which was not processed yet.
    // This could be for two reasons:
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
    //
    if (!readPending && !config.isAutoRead()) {
        removeReadOp();
    }
}
 
大致看完上面,我们doReadMessages()方法 -- int localRead = doReadMessages(readBuf);
  • 由下面可见使用的是jdk底层的accept方法创建一个channel连接,并封装为NioSocketChannel放到readBuf中
// NioServerSocketChannel.java
protected int doReadMessages(List<Object> buf) throws Exception {
    // 使用jdk底层accept()方法获取一个channel
    SocketChannel ch = javaChannel().accept();
 
    try {
        if (ch != null) {
            // 将获取到的channel包装成NioSocketChannel,并放到readBuf中
            // NioSocketChannel这个就不看源码,其基本过程是创建id、unsafe、pipeline,并且设置为非阻塞,并且还禁用了Nagle算法的机制
            buf.add(new NioSocketChannel(this, ch));
            // 返回添加成功,用于while中allocHandle.incMessagesRead(localRead)统计
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);
        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }
 
    return 0;
}
 
然后我们继续看doReadMessages()下的一个方法allocHandle.incMessagesRead(localRead);
// MaxMessageHandle
public final void incMessagesRead(int amt) {
    totalMessages += amt;
}
 
然后继续看while循环里面的判断方法,其逻辑大致如下
public boolean continueReading() {
    // 判断是否开启已读
    // totalMessages是否小于16
    return config.isAutoRead() &&
        attemptedBytesRead == lastBytesRead &&
        totalMessages < maxMessagePerRead &&
        totalBytesRead < Integer.MAX_VALUE;
}
 
上面已经分析了NioMessageUnsafe.read()方法是如何接收一个客户端连接并创建NioSocketChannel的,并存储到readBuf中
  • 接下来看一下readBuf如何处理,其处理第一个过程如下
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
    readPending = false;
    // 处理readBuf
    pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
 
跟踪fireChannelRead方法到如下:
// ServerBootstrap.ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
 
    child.pipeline().addLast(childHandler);
    // 设置options
    for (Entry<ChannelOption<?>, Object> e: childOptions) {
        try {
            if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                logger.warn("Unknown channel option: " + e);
            }
        } catch (Throwable t) {
            logger.warn("Failed to set a channel option: " + child, t);
        }
    }
    // 设置attrs
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }
 
    try {
        // 调用netty服务端初始化的选择器chooser获取一个NioEventLoop,注册在selector上,此时还未有任何绑定事件
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}
分析1:设置的options和attrs均是在创建Nio服务端时,用户自己设置的,如下图:
 
Netty源码简析之客户端新连接接入
 
分析2:如何使用chooser?
childGroup.register(child).addListener(new ChannelFutureListener() {
    @Override
    ..............
 
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
 
public EventExecutor next() {
    return chooser.next();
}
最后调用到这里获取一个NioEventLoop,详情为什么要用&获取键值可见上一篇分析https://blog.csdn.net/qq_28666081/article/details/102540045 
public EventExecutor next() {
    return executors[idx.getAndIncrement() & executors.length - 1];
}
 
然后看完上面新连接是如何NioEventLoop分配并注册到Selector,接下来看看新连接NioSocketChannel读时间的注册
从AbstractChannel.java的register0()开始断点跟踪,并进入pipeline.fireChannelActive();方法跟踪到如下:
private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}
继续跟踪如下:
// DefaultChannelPipeline.java
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}
最最最后跟踪到了以下selector的注册过程:
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }
 
    readPending = true;
 
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        // 将读事件注册到selector上!!!!!!!!!!!!!!!
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}