netty4(二)ServerBootstrap的启动一

EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
//负责数据读写
EventLoopGroup workerGroup = new NioEventLoopGroup();
//服务类
try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    //reactor模型接收和处理
    bootstrap.group(bossGroup, workerGroup)
            //channelFactory属性为ReflectiveChannelFactory通过反射创建NioServerSocketChannel对象
            .channel(NioServerSocketChannel.class) // (3)
            .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new DiscardServerHandler());
                }
            })
            //完成三次握手后等待队列的大小,超过了则拒绝,默认50
            .option(ChannelOption.SO_BACKLOG, 128)          // (5)
            .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

    // Bind and start to accept incoming connections.
    ChannelFuture f = bootstrap.bind(5555).sync(); 
就从启动bind()方法开始进入到

AbstractBootstrap

netty4(二)ServerBootstrap的启动一

然后进入dobind方法

private ChannelFuture doBind(final SocketAddress localAddress) {
    //返回DefaultChannelPromise,注册线程池到channel
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        //在这一点上,我们知道注册是完整和成功的
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        省略。。。。。
    }
}

然后我们先执行,在这边创建channel,然后先init初始化,返回到

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //创建之前设置的NioServerSocketChannel对象
        channel = channelFactory.newChannel();
        //服务端,初始化channel属性,并且设置pipeline,新建ServerBootstrapAcceptor
        //客户端ChannelInitializer初始化
        init(channel);
    } catch (Throwable t) {
        大致就是注册失败,我们注册到全局线程去执行
    }
    //ServerBootstrapConfig里面放了ServerBootstrap,获得线程池最后到SingleThreadEventLoop里面把线程池注册给channel
    //我们创建的channelpipeline也在这里加入
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

   
    return regFuture;
}

调用这里面的init()方法,主要就是设置channel属性和初始化一个ChannelInitializer

void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        //之前填的属性射到channel        setChannelOptions(channel, options, logger);
    }

   省略。。。。

    ChannelPipeline p = channel.pipeline();

    省略。。。。

    //查到headtail之间,后面会初始化这个ChannelInitializer
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

上面执行完了返回到AbstractBootstrap的initAndRegister方法,接着执行这个

//ServerBootstrapConfig里面放了ServerBootstrap,获得线程池最后到SingleThreadEventLoop里面把线程池注册给channel
//我们创建的channelpipeline也在这里加入
ChannelFuture regFuture = config().group().register(channel);
先去MultithreadEventLoopGroup的方法里面选出一个线程池
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

最后到AbstractChannel的方法里面去注册,这个promise就是返回用来做sync()的东西,这里一个线程池就一个线程

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    。。。。。

    AbstractChannel.this.eventLoop = eventLoop;

    //是否启动这个线程池
    if (eventLoop.inEventLoop()) {
        //注册channelselect,同时设置promise完成
        register0(promise);
    } else {
        try {
            //提交并且启动线程池
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
           。。。。。。
        }
    }
}

然后调用register0方法

private void register0(ChannelPromise promise) {
    try {
        
        //AbstractNioChannel的方法,注册到select
        doRegister();
        
        //调用回调函数
        pipeline.invokeHandlerAddedIfNeeded();

        //设置promise完成,避免阻塞
        safeSetSuccess(promise);
        //ChannelInitializer的东西添加到pipeline
        pipeline.fireChannelRegistered();
       
        //如果通道从未注册过,则只启用通道**。如果通道被撤销注册并重新注册,这将防止触发多个通道活动。
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                。。。。。
                //设置读,还有感兴趣的东西accept之类的
                beginRead();
            }
        }
    } catch (Throwable t) {
      。。。。。
    }
}

进去doRegister看看

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            //channel注册到selector里面,但为什么是个0呢返回之后增加兴趣事件
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}

返回上一个register0方法这边初始化刚才添加的管道调用initlize方法,网管道里面加上一个ServerBootstrapAcceptor

因为管道是分为inbond和outbond的读取和输出的时候要搞清楚

//ChannelInitializer的东西添加到pipeline
pipeline.fireChannelRegistered();

返回上一个register0方法,在这边注册读感兴趣

//设置读,还有感兴趣的东西accept之类的
beginRead();

然后这边运行完了就返回到doBind方法

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    //这个方法在channelregister()被触发之前被调用。给用户处理程序一个机会,在它的通道注册()实现中设置管道
    //获得NioEventLoop里面的线程池
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                //AbstractChannel里面,经过pipeline最后应该是给实际的channel注册地址吧
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
然后弄完了再返回就完成了服务端启动了