netty4(二)ServerBootstrap的启动一
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) //负责数据读写 EventLoopGroup workerGroup = new NioEventLoopGroup(); //服务类 try { ServerBootstrap bootstrap = new ServerBootstrap(); //reactor模型接收和处理 bootstrap.group(bossGroup, workerGroup) //channelFactory属性为ReflectiveChannelFactory通过反射创建NioServerSocketChannel对象 .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) //完成三次握手后等待队列的大小,超过了则拒绝,默认50 .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = bootstrap.bind(5555).sync();就从启动bind()方法开始进入到
AbstractBootstrap
然后进入dobind方法
private ChannelFuture doBind(final SocketAddress localAddress) { //返回DefaultChannelPromise,注册线程池到channel final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. //在这一点上,我们知道注册是完整和成功的 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { 省略。。。。。 } }
然后我们先执行,在这边创建channel,然后先init初始化,返回到
final ChannelFuture initAndRegister() { Channel channel = null; try { //创建之前设置的NioServerSocketChannel对象 channel = channelFactory.newChannel(); //服务端,初始化channel属性,并且设置pipeline,新建ServerBootstrapAcceptor //客户端ChannelInitializer初始化 init(channel); } catch (Throwable t) { 大致就是注册失败,我们注册到全局线程去执行 } //ServerBootstrapConfig里面放了ServerBootstrap,获得线程池最后到SingleThreadEventLoop里面把线程池注册给channel //我们创建的channel的pipeline也在这里加入 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
调用这里面的init()方法,主要就是设置channel属性和初始化一个ChannelInitializer
void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { //之前填的属性射到channel里 setChannelOptions(channel, options, logger); } 省略。。。。 ChannelPipeline p = channel.pipeline(); 省略。。。。 //查到head与tail之间,后面会初始化这个ChannelInitializer p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
上面执行完了返回到AbstractBootstrap的initAndRegister方法,接着执行这个
//ServerBootstrapConfig里面放了ServerBootstrap,获得线程池最后到SingleThreadEventLoop里面把线程池注册给channel //我们创建的channel的pipeline也在这里加入 ChannelFuture regFuture = config().group().register(channel);先去MultithreadEventLoopGroup的方法里面选出一个线程池
public ChannelFuture register(Channel channel) { return next().register(channel); }
最后到AbstractChannel的方法里面去注册,这个promise就是返回用来做sync()的东西,这里一个线程池就一个线程
public final void register(EventLoop eventLoop, final ChannelPromise promise) { 。。。。。 AbstractChannel.this.eventLoop = eventLoop; //是否启动这个线程池 if (eventLoop.inEventLoop()) { //注册channel到select,同时设置promise完成 register0(promise); } else { try { //提交并且启动线程池 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { 。。。。。。 } } }
然后调用register0方法
private void register0(ChannelPromise promise) { try { //AbstractNioChannel的方法,注册到select doRegister(); //调用回调函数 pipeline.invokeHandlerAddedIfNeeded(); //设置promise完成,避免阻塞 safeSetSuccess(promise); //把ChannelInitializer的东西添加到pipeline pipeline.fireChannelRegistered(); //如果通道从未注册过,则只启用通道**。如果通道被撤销注册并重新注册,这将防止触发多个通道活动。 if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { 。。。。。 //设置读,还有感兴趣的东西accept之类的 beginRead(); } } } catch (Throwable t) { 。。。。。 } }
进去doRegister看看
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { //把channel注册到selector里面,但为什么是个0呢返回之后增加兴趣事件 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } }
返回上一个register0方法这边初始化刚才添加的管道调用initlize方法,网管道里面加上一个ServerBootstrapAcceptor
因为管道是分为inbond和outbond的读取和输出的时候要搞清楚
//把ChannelInitializer的东西添加到pipeline pipeline.fireChannelRegistered();
返回上一个register0方法,在这边注册读感兴趣
//设置读,还有感兴趣的东西accept之类的 beginRead();
然后这边运行完了就返回到doBind方法
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. //这个方法在channelregister()被触发之前被调用。给用户处理程序一个机会,在它的通道注册()实现中设置管道 //获得NioEventLoop里面的线程池 channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { //到AbstractChannel里面,经过pipeline最后应该是给实际的channel注册地址吧 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }然后弄完了再返回就完成了服务端启动了