netty源码分析一

前段时间比较忙,没有时间去学习啦,现在稍微好一点啦,看着自己年初定下的目标,还有很多没有完成的呀。从下阶段起学习netty框架啦,之前也了解netty框架,但是没有深入理解,现在好好学习一下。

学习netty框架之前,要了解java nio基本知识java nio常见的API有

channel 

channel可以理解为通道,其中包括TCP/UDP还有文件

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

buffer 

buffer可以理解为一块缓存或者是一块内存,其中包括

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

selector  

Selector允许单线程处理多个 Channel。如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便

这是在一个单线程中使用一个Selector处理3个Channel的图示:

netty源码分析一

要使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。


java nio是一个网络编程基础,其中tomcat jetty服务器也使用java nio思想和技术,有时间可以看看里面的源码对比一下。

netty和mina都是java网络编程的框架 而且这两个框架出自于同一个作者。今天就netty4.1.9源码分析一下:

今天首先分析Bootstrap和ServerBootstrap这个两个启动类。这两个类分别客户端启动和服务器启动。他们都继承AbstractBootstrap这个抽象类

这个类里几个主要方法:

 public B group(EventLoopGroup group)
  {
    if (group == null) {
      throw new NullPointerException("group");
    }
    if (this.group != null) {
      throw new IllegalStateException("group set already");
    }
    this.group = group;
    return this;
  }

  public B channel(Class<? extends C> channelClass)
  {
    if (channelClass == null) {
      throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory(channelClass));
  }


  @Deprecated
  public B channelFactory(ChannelFactory<? extends C> channelFactory)
  {
    if (channelFactory == null) {
      throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
      throw new IllegalStateException("channelFactory set already");
    }


    this.channelFactory = channelFactory;
    return this;
  }


  public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory)
  {
    return channelFactory(channelFactory);
  }

分别是获得当前的线程池,channel通道方式和channelFactory工厂模式方式


今天不会详细见客户端和服务器详细的启动流程,今天让大家看看客户端启动时一个非常重要的方法是connect和服务器启动一个非常重要的方法是bind

 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress)
  {
    if (remoteAddress == null) {
      throw new NullPointerException("remoteAddress");
    }
    validate();
    return doResolveAndConnect(remoteAddress, localAddress);
  }


public ChannelFuture bind(SocketAddress localAddress)
  {
    validate();
    if (localAddress == null) {
      throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
  }

他们俩的方法代码非常像都有一个校验方法,分别是连接服务器方法和监听客户端请求。详细的连接和监听先看看下次分析。

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress)
  {
    ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    if (regFuture.isDone()) {
      if (!regFuture.isSuccess()) {
        return regFuture;
      }
      return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    }


    final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener()
    {
      public void operationComplete(ChannelFuture future)
        throws Exception
      {
        Throwable cause = future.cause();
        if (cause != null)
        {
          promise.setFailure(cause);
        }
        else
        {
          promise.registered();
          Bootstrap.this.doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
        }
      }
    });
    return promise;
  }


private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
      return regFuture;
    }


    if (regFuture.isDone())
    {
      ChannelPromise promise = channel.newPromise();
      doBind0(regFuture, channel, localAddress, promise);
      return promise;
    }


    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener()
    {
      public void operationComplete(ChannelFuture future) throws Exception {
        Throwable cause = future.cause();
        if (cause != null)
        {
          promise.setFailure(cause);
        }
        else
        {
          promise.registered();


          AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise);
        }
      }
    });
    return promise;
  }

理解其中一个,理解另一个就比较轻松了。