Netty源码简析之客户端新连接接入
总结:
Netty新连接注册流程:
Netty在服务端绑定NioEventLop,并轮询到accept事件,服务端调用jdk底层accept()方法获取客户端channel,并且封装成客户端的NioSocketChannel。并且创建一系列的组件,如unsafe(读写)、pipeline(数据、业务处理)。服务端通过ServerBootStrapAcceptorg给当前客户端分配channel,并绑定到唯一的selector上。最后通过传播(pipeline.fireChannelActive())将客户端channel向selecto注册读事件
举一个Netty服务端创建例子如下:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new AuthHandler());
//.................
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
首先来看一下netty是如何检测新连接的(会扫描并处理非阻塞)
首先我们看回上文的《NIOEventLoop解析》,连接为 https://blog.csdn.net/qq_28666081/article/details/102540045 processSelectedKey() (源码位置:NioEventLoop#processSelectedKey)处理IO事件的方法
首先看读事件的收取
// NioEventLoop.java
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
继续往下跟踪,可定位到对应的消息读取(服务端在NioMessageUnsafe,客户端在NioByteUnsafe),里面写了一个while用于检测
// NioMessageUnsafe
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
// 轮询获取
do {
// 从readBuf读取数据
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// 累加消息读取的数量
allocHandle.incMessagesRead(localRead);
// 是否可继续读
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
大致看完上面,我们doReadMessages()方法 -- int localRead = doReadMessages(readBuf);
-
由下面可见使用的是jdk底层的accept方法创建一个channel连接,并封装为NioSocketChannel放到readBuf中
// NioServerSocketChannel.java
protected int doReadMessages(List<Object> buf) throws Exception {
// 使用jdk底层accept()方法获取一个channel
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
// 将获取到的channel包装成NioSocketChannel,并放到readBuf中
// NioSocketChannel这个就不看源码,其基本过程是创建id、unsafe、pipeline,并且设置为非阻塞,并且还禁用了Nagle算法的机制
buf.add(new NioSocketChannel(this, ch));
// 返回添加成功,用于while中allocHandle.incMessagesRead(localRead)统计
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
然后我们继续看doReadMessages()下的一个方法allocHandle.incMessagesRead(localRead);
// MaxMessageHandle
public final void incMessagesRead(int amt) {
totalMessages += amt;
}
然后继续看while循环里面的判断方法,其逻辑大致如下
public boolean continueReading() {
// 判断是否开启已读
// totalMessages是否小于16
return config.isAutoRead() &&
attemptedBytesRead == lastBytesRead &&
totalMessages < maxMessagePerRead &&
totalBytesRead < Integer.MAX_VALUE;
}
上面已经分析了NioMessageUnsafe.read()方法是如何接收一个客户端连接并创建NioSocketChannel的,并存储到readBuf中
-
接下来看一下readBuf如何处理,其处理第一个过程如下
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 处理readBuf
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
跟踪fireChannelRead方法到如下:
// ServerBootstrap.ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
// 设置options
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
// 设置attrs
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 调用netty服务端初始化的选择器chooser获取一个NioEventLoop,注册在selector上,此时还未有任何绑定事件
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
分析1:设置的options和attrs均是在创建Nio服务端时,用户自己设置的,如下图:
分析2:如何使用chooser?
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
..............
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public EventExecutor next() {
return chooser.next();
}
最后调用到这里获取一个NioEventLoop,详情为什么要用&获取键值可见上一篇分析https://blog.csdn.net/qq_28666081/article/details/102540045
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
然后看完上面新连接是如何NioEventLoop分配并注册到Selector,接下来看看新连接NioSocketChannel读时间的注册
从AbstractChannel.java的register0()开始断点跟踪,并进入pipeline.fireChannelActive();方法跟踪到如下:
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
继续跟踪如下:
// DefaultChannelPipeline.java
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
最最最后跟踪到了以下selector的注册过程:
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 将读事件注册到selector上!!!!!!!!!!!!!!!
selectionKey.interestOps(interestOps | readInterestOp);
}
}