netty学习总结

一.netty基本概念:

netty是一个异步的事件驱动的网络应用框架,用于可维护的、高性能协议的服务端和客户端的快速开seda:发把一个请求分为若干个个stages,每一个stages可以用

不同的线程进行处理,不同的stages采用了事件驱动的异步沟通方式进行通信

1.可以作为一个rpc框架,2.作为一个长连接的服务器,3.作为一个http的服务器(类似于springmvc,struts2框架编写的web应用但是并没有实现一个servlet的规范)

二.基本使用:

netty的一些基本代码实例,学习新技术需要成就感,因此要先会用,然后研究原理。

三.netty与Nio:

 1.io模型:

(1)阻塞 IO 模型

最传统的一种 IO 模型,即在读写数据过程中会发生阻塞现象。当用户线程发出 IO 请求之后,内
核会去查看数据是否就绪,如果没有就绪就会等待数据就绪,而用户线程就会处于阻塞状态,用
户线程交出 CPU。当数据就绪之后,内核会将数据拷贝到用户线程,并返回结果给用户线程,用
户线程才解除 block 状态。典型的阻塞 IO 模型的例子为:data = socket.read();如果数据没有就
绪,就会一直阻塞在 read 方法

(2)非阻塞io模型

当用户线程发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。如果结果是一个
error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦内核中的数据准备
好了,并且又再次收到了用户线程的请求,那么它马上就将数据拷贝到了用户线程,然后返回。
所以事实上,在非阻塞 IO 模型中,用户线程需要不断地询问内核数据是否就绪,也就说非阻塞 IO
不会交出 CPU,而会一直占用 CPU。典型的非阻塞 IO 模型一般如下:
while(true){
data = socket.read();
if(data!= error){
处理数据
break;
}
}
但是对于非阻塞 IO 就有一个非常严重的问题,在 while 循环中需要不断地去询问内核数据是否就
绪,这样会导致 CPU 占用率非常高,因此一般情况下很少使用 while 循环这种方式来读取数据。

(3)多路复用的io模型:

多路复用 IO 模型是目前使用得比较多的模型。Java NIO 实际上就是多路复用 IO。在多路复用 IO
模型中,会有一个线程不断去轮询多个 socket 的状态,只有当 socket 真正有读写事件时,才真
正调用实际的 IO 读写操作。因为在多路复用 IO 模型中,只需要使用一个线程就可以管理多个
socket,系统不需要建立新的进程或者线程,也不必维护这些线程和进程,并且只有在真正有
socket 读写事件进行时,才会使用 IO 资源,所以它大大减少了资源占用。在 Java NIO 中,是通
过 selector.select()去查询每个通道是否有到达事件,如果没有事件,则一直阻塞在那里,因此这
种方式会导致用户线程的阻塞。多路复用 IO 模式,通过一个线程就可以管理多个 socket,只有当
socket 真正有读写事件发生才会占用资源来进行实际的读写操作。因此,多路复用 IO 比较适合连
接数比较多的情况。
另外多路复用 IO 为何比非阻塞 IO 模型的效率高是因为在非阻塞 IO 中,不断地询问 socket 状态
时通过用户线程去进行的,而在多路复用 IO 中,轮询每个 socket 状态是内核在进行的,这个效
率要比用户线程要高的多。
不过要注意的是,多路复用 IO 模型是通过轮询的方式来检测是否有事件到达,并且对到达的事件
逐一进行响应。因此对于多路复用 IO 模型来说,一旦事件响应体很大,那么就会导致后续的事件
迟迟得不到处理,并且会影响新的事件轮询。

(4)信号驱动 IO 模型

在信号驱动 IO 模型中,当用户线程发起一个 IO 请求操作,会给对应的 socket 注册一个信号函
数,然后用户线程会继续执行,当内核数据就绪时会发送一个信号给用户线程,用户线程接收到
信号之后,便在信号函数中调用 IO 读写操作来进行实际的 IO 请求操作。

(5)异步 IO 模型:

异步 IO 模型才是最理想的 IO 模型,在异步 IO 模型中,当用户线程发起 read 操作之后,立刻就
可以开始去做其它的事。而另一方面,从内核的角度,当它受到一个 asynchronous read 之后,
它会立刻返回,说明 read 请求已经成功发起了,因此不会对用户线程产生任何 block。然后,内
核会等待数据准备完成,然后将数据拷贝到用户线程,当这一切都完成之后,内核会给用户线程
发送一个信号,告诉它 read 操作完成了。也就说用户线程完全不需要实际的整个 IO
操作是如何
进行的,只需要先发起一个请求,当接收内核返回的成功信号时表示 IO 操作已经完成,可以直接
去使用数据了。
也就说在异步 IO 模型中,IO 操作的两个阶段都不会阻塞用户线程,这两个阶段都是由内核自动完
成,然后发送一个信号告知用户线程操作已完成。用户线程中不需要再次调用 IO 函数进行具体的
读写。这点是和信号驱动模型有所不同的,在信号驱动模型中,当用户线程接收到信号表示数据
已经就绪,然后需要用户线程调用 IO 函数进行实际的读写操作;而在异步 IO 模型中,收到信号
表示 IO 操作已经完成,不需要再在用户线程中调用 IO 函数进行实际的读写操作。
注意,异步 IO 是需要操作系统的底层支持,在 Java 7 中,提供了 Asynchronous IO。
 

2.nio组件:

  • buffer

buffer本身是一块内存,底层实现上,实际上是个数组,数据的读和写是通过buffer来实现的

nio可以允许buffer实现读写的切换,buffer.filp();//读写切换的开关

所有数据的读写都是通过buffer来进行的,永远不会出现直接向channel读取或者写入的情况

public static void main(String [] args)throws  Exception{
    FileOutputStream fileOutputStream = new FileOutputStream("aa.txt");
    FileChannel fileChannel = fileOutputStream.getChannel();
    ByteBuffer byteBuffer = ByteBuffer.allocate(512);
    byte[] message = "hello world ".getBytes();

   for(int i=0;i<message.length;i++){
       byteBuffer.put(message[i]);
   }
   byteBuffer.flip();
   fileChannel.write(byteBuffer);
   fileOutputStream.close();
}

三个属性:position、limit、capacity的解释:

public static void main(String [] args)throws  Exception{
    ByteBuffer byteBuffer = ByteBuffer.allocate(512);
    byte[] message = "hello world ".getBytes();
    byteBuffer.put(message);
    System.out.println("初始状态---"+byteBuffer.position()+":"+byteBuffer.limit());
    byteBuffer.flip();
    while (byteBuffer.remaining()>0){
        byte b = byteBuffer.get();
        System.out.println(b);
    }
    byteBuffer.flip();
    System.out.println("结束状态---"+byteBuffer.position()+":"+byteBuffer.limit());
    byte[] message2 = "hello world hello ".getBytes();
    byteBuffer.put(message2);
    System.out.println("结束状态2---"+byteBuffer.position()+":"+byteBuffer.limit());

}

(1)DirectBuffer:

调用native方法即本地方法在堆外内存生成directBuffer,能够提升效率

如果使用HeapByteBuffer,当与外设进行交互的时候会将jvm中的数据对象拷贝到堆外内存,多了一个拷贝的过程

为什么要拷贝?

虽然操作系统可以访问到堆上的内存但是

考虑到gc的时候如果正在与外设进行io交互,对于标记清除算法的jvm,进行gc的时候会出现混乱

并且堆上维护了一个address,对应了堆外内存的地址,当address被清除的时候,堆外内存的数据也会清除

但是DirectBuffer,他是不会在jvm堆中在生成对象了,直接和io进行交互,也就不会有拷贝的过程,所以称之为零拷贝,堆外内存由操作系统进行维护

public static void main(String [] args) throws  Exception{
    FileInputStream fileInputStream = new FileInputStream("");
    FileChannel inputChannel = fileInputStream.getChannel();
    FileOutputStream fileOutputStream = new FileOutputStream("");
    FileChannel outputChannel = fileOutputStream.getChannel();
    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(512);//分配直接内存缓冲
    while(true){
        byteBuffer.clear();
        int read = inputChannel.read(byteBuffer);
        if(-1 == read) break;
        byteBuffer.flip();
        outputChannel.write(byteBuffer);
    }
    fileInputStream.close();
    fileOutputStream.close();
}

(2)mappedByteBuffer

内存映射文件:允许java直接从内存访问的特殊文件

public static void main(String [] args) throws  Exception{
    /**
     * 内存映射文件,直接在内存中修改文件内容
     */
    RandomAccessFile randomAccessFile = new RandomAccessFile("C:\\data\\report\\upload\\2019-06\\ddd.txt","rw");
    FileChannel  fileChannel = randomAccessFile.getChannel();
    int count = 0;
    MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE,0,fileChannel.size());
    for(int x=0;x<fileChannel.size();x++){
        if(mappedByteBuffer.get(x)==10){
            count = x;
            break;
        }
    }
    int length = (int)fileChannel.size();
    byte [] bytes = new byte[length-count];
    mappedByteBuffer.get(bytes,count+1,length-1);
    mappedByteBuffer.flip();
    mappedByteBuffer.put(bytes,0,length-count);
    randomAccessFile.close();
}

(3)Scattering与Gathering:

Scattering将来自于一个channel的数据读到多个buffer,按照顺序,第一个buffer满了之后,读读到第二个,实例:实现数据的分门别类,比如网络操作自定义协议,(请求头1,请求头2,请求体)把第一个请求头读取到第一个buffer,第二个读取到第二个buffer,不用读取之后再次解析数据。

Gathering按照顺序,将多个buffer的内容写到channel中:

public static void main(String [] args) throws  Exception{
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    InetSocketAddress address =  new InetSocketAddress(8898);
    serverSocketChannel.socket().bind(address);
    int messageLength = 9;
    ByteBuffer [] buffers = new ByteBuffer[3];
    buffers[0] = ByteBuffer.allocate(2);
    buffers[1] = ByteBuffer.allocate(3);
    buffers[2] = ByteBuffer.allocate(4);
    SocketChannel  socketChannel = serverSocketChannel.accept();
    while(true){
        int byteRead = 0;
        while(byteRead < messageLength){
          long r = socketChannel.read(buffers);
          byteRead += r;
          System.out.println("byteRead:"+byteRead);
            Arrays.asList(buffers).stream().map(buffer->"position:"+
                    buffer.position()+",limit:"+buffer.limit()).forEach(System.out::println);
        }
        Arrays.asList(buffers).forEach(buffer->{
            buffer.flip();
        });
        long byteWritten = 0;
        while(byteWritten<messageLength){
            long r = socketChannel.write(buffers);
            byteWritten += r;

        }
        Arrays.asList(buffers).forEach(byteBuffer -> {
            byteBuffer.clear();
        });
        System.out.println("read:"+byteRead+","+"write:"+byteWritten);
    }

}

  • channel

与stream不同,channel是双向的,一个流只可能是InputOutStream或者OutputStream,channel打开后可以进行读取或者写入

channel可以更好的反映底层操作系统的真实情况。因为在linux中,底层操作系统的通道就是双向的

  • selector:

普通网络编程:每一个连接都需要服务端启动一个线程来进行处理,比较消耗资源,所以适合并发场景不是很高的场景

传统的基于io的网络编程:

new ServerSocket();

serverSocker.bind(port);//port用于客户端和服务端进行连接的端口,实际的数据传输的端口由服务端选择未被占用的端口中随机分配

while(true){

serverSocket.accept();//阻塞,等待客户端连接

new Thread(socket).run(){

socket.getInputStream();//获取流

}

}

缺点:每个链接都需要一个线程的处理,多个线程之间上下文切换开销很大,当连接上没有数据传递的时候线程始终在运行,造成线程资源的浪费

event:事件(异步的处理)

nio编程模型特点:服务端可以使用一个线程来处理多个客户端的连接,最终处理每个连接是同一个线程,通过事件来触发。

代码1:

服务器端监听5个端口号,一个线程处理所有客户端的连接

public static void main(String [] args) throws  Exception{
    int [] ports = new int[5];
    ports[0] = 5000;
    ports[1] = 5001;
    ports[2] = 5002;
    ports[3] = 5003;
    ports[4] = 5004;
    Selector selector = Selector.open();

   for(int i =0;i<ports.length;i++){
       ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
       serverSocketChannel.configureBlocking(false);//配置channel是否阻塞
       ServerSocket serverSocket = serverSocketChannel.socket();
       InetSocketAddress address = new InetSocketAddress(ports[i]);
       serverSocket.bind(address);
       serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//注册selecttor并且标识感兴趣的事件,连接事件
   }
   while (true){
       int numbers = selector.select();//表示选择的数量,即key-set的数量
       //获取到selected-key的集合即获取到事件
       Set<SelectionKey> selectionKeys =  selector.selectedKeys();
       Iterator<SelectionKey> iterator = selectionKeys.iterator();
       int byteRead = 0;
       while(iterator.hasNext()){
           SelectionKey selectionKey = iterator.next();
           if(selectionKey.isAcceptable()){//
               //真正的连接,selector与channel关联
               ServerSocketChannel  serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
               SocketChannel  socketChannel =  serverSocketChannel.accept();
               socketChannel.configureBlocking(false);
               socketChannel.register(selector,SelectionKey.OP_READ);//将读事件添加到集合,这样才会到else if当中
               iterator.remove();//一定要删除掉,表示这个事件已经结束,否则会重复监听
               System.out.println("获取到了客户端的连接"+socketChannel);
           }else  if(selectionKey.isReadable()){
               SocketChannel  socketChannel = (SocketChannel) selectionKey.channel();
               while(true){
                   ByteBuffer  byteBuffer =  ByteBuffer .allocate(512);
                   byteBuffer.clear();
                   int read  = socketChannel.read(byteBuffer);
                   if(read<=0)break;
                   byteBuffer.flip();
                   socketChannel.write(byteBuffer);
                   byteRead += read;
               }
               System.out.println("读取:" + byteRead + ","+"来自于:" + socketChannel);
               iterator.remove();
           }
       }
   }

}

代码2:简单的聊天程序:

  服务端:

public class NioTestServer8 {
    private static Map<String,SocketChannel> clientMap = new HashMap<>();//维护客户端的连接信息
    public static void main(String [] args) throws Exception{
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//建立连接的作用
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(8899));
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//开始连接的时候只有一个事件,关注连接事件
        while(true){
            try {
                selector.select();
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                selectionKeySet.forEach(selectionKey -> {
                    final SocketChannel client ;
                    try{
                        if(selectionKey.isAcceptable()){
                            //连接建立,获取到连接到的channl,注册更多的感兴趣的事件
                             ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel)selectionKey.channel();
                             client = serverSocketChannel1.accept();//真实的连接
                            client.configureBlocking(false);
                            client.register(selector,SelectionKey.OP_READ);//关注读取事件
                            String uuid = UUID.randomUUID()+"";
                            clientMap.put(uuid,client);
                        }else if(selectionKey.isReadable()){
                            client = (SocketChannel)selectionKey.channel();
                            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                            int count = client.read(readBuffer);
                            if(count>0){
                                readBuffer.flip();
                                Charset charset = Charset.forName("utf-8");
                                String receiveMessage = String.valueOf(charset.decode(readBuffer).array());
                                System.out.println(client + ":" + receiveMessage);
                                String sendKey = "";
                                for(Map.Entry<String, SocketChannel> entry:clientMap.entrySet()){
                                    if(client == entry.getValue()){
                                        sendKey = entry.getKey();
                                        break;
                                    }
                                }
                                for(Map.Entry<String, SocketChannel> entry:clientMap.entrySet()){
                                    SocketChannel value = entry.getValue();
                                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                                    writeBuffer.put((sendKey +":"+receiveMessage).getBytes());
                                    writeBuffer.flip();
                                    value.write(writeBuffer);
                                }
                            }
                        }
                    }catch (Exception e){

                    }
                });
               selectionKeySet.clear();
               //开始的时候连接通道标识了连接事件,只对连接事件感兴趣
                //获取到真正的通道的时候标识了读事件,只对读事件感兴趣
                //每次访问后都remove()相应的SelectionKey,但是移除了selectedKeys中的SelectionKey不代表移除了selector中的channel信息(这点很重要)
            }catch (Exception e){

            }
        }

    }
}

 

客户端:

package nio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NioTestClient8 {
    public static void main(String [] args){
        try{
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            Selector selector = Selector.open();
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            socketChannel.connect(new InetSocketAddress("127.0.0.1",8899));
            while(true){
                selector.select();
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                for(SelectionKey selectionKey : selectionKeySet){
                    SocketChannel client = (SocketChannel)selectionKey.channel();
                    if(selectionKey.isConnectable()){
                        if(client.isConnectionPending()){
                            //连接是否处在正在进行的状态
                            client.finishConnect();//完成连接,连接已经建立
                            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                            writeBuffer.put((LocalDateTime.now()+"连接成功").getBytes());
                            writeBuffer.flip();
                            client.write(writeBuffer);
                            ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
                            executorService.submit(new Runnable() {
                                @Override
                                public void run() {
                                    while(true){
                                        try {
                                            writeBuffer.clear();
                                            InputStreamReader inputStreamReader = new InputStreamReader(System.in);
                                            BufferedReader br = new BufferedReader(inputStreamReader);
                                            String sendMessage = br.readLine();
                                            writeBuffer.put(sendMessage.getBytes());
                                            writeBuffer.flip();
                                            client.write(writeBuffer);
                                        } catch (Exception e) {
                                            e.printStackTrace();
                                        }

                                    }
                                }
                            });

                        }
                        client.register(selector,SelectionKey.OP_READ);
                    } else if(selectionKey.isReadable()){
                        SocketChannel socketChannel1 = (SocketChannel)selectionKey.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int count = client.read(byteBuffer);
                        if(count>0){
                            String receiveMessage = new String(byteBuffer.array(),0,count);
                            System.out.println(receiveMessage);
                        }
                    }
                    selectionKeySet.clear();
                }


            }

        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

 

3.零拷贝深入分析:

(1)普通io操作:

netty学习总结

 

 

linux系统上用户空间:userspace比如socketChannel

内核空间:kernel space

hardware:硬件,或者外设

用户空间向内核空间发送读取数据的指令,从用户空间模式切换成内核空间模式,内核空间通过直接内存访问磁盘读取数据到内核空间的缓冲区当中,然后再把缓冲区的数据拷贝到用户空间的缓冲区。然后开始执行逻辑代码,read操作完毕。

write操作:将读到的数据拷贝到内核空间,内核空间将数据写到外设(socketChannel)

(2)系统层面的零拷贝:

netty学习总结

 

sendfile指令:用户空间没有拷贝操作,全部都是在内核空间进行,而且只有两次上下文的切换

(3)通过Scatter和gather直接读取数据到内核空间缓冲区:

netty学习总结

 

 

(4)真正的零拷贝全流程:

netty学习总结

dma直接内存访问

将硬盘的数据拷贝到内核空间缓冲区,将数据描述符(引用地址)拷贝到socket buffer,这就是scatter操作,协议引擎就使用gather操作读取这两个数据发送出去

 

 

四.Reactor模式:

netty的最最基础模式:

Doug lea写的scalable IO in java是对reator模式的最好阐述,读懂了这个文档就读懂了reator

其他文档:https://www.cnblogs.com/crazymakercircle/p/9833847.html

基础设计(单线程):就是一个线程,能够检测客户端向服务端发送的连接,当连接(事件)建立之后,reactor会分发正确的handler进行处理,acceptor的作用就是接受连接,也就是连接事件。

netty学习总结

Reactor意图:处理一个或者多个客户端并发的向一个应用发送请求。

reactor一共有五种角色:

netty学习总结

Handle:句柄或者描述符,本质上表示一种资源是由操作系统提供:该资源用于表示一个个的事件,比如文件描述符,或者网络编程中的socket描述符。既可以来自内部,也可以来自外部,比如客户端向服务端发送数据,内部事件:操作系统产生的定时器事件。

 

synchronize event demultiplexer 同步事务分离器:阻塞的

本身是一个系统调用,用于等待事件的发生,事件可能是一个或者多个,调用方在调用的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止。对于linux来说,指的就是常用的io多路复用比如select,poll,epoll.对应nio中的selector

event Handler 事件处理器:

本身由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的反馈机制

concrete event handler 具体事件处理器,是event handler的实现,本质上就是处理器的实现

initiation disapatcher 初始分发器:

实际上就是Reactor角色,本身定义了一些规范,这些规范用于控制事件的调度方式,同时又提供了应用进行事件处理器的注册、删除等设施。本身是事件处理器的核心所在,会通过同步事件分离器等待事件的发生,首先会分理出每一个事件,然后调用事件处理器,然后调用相关的回调方法来处理这些事件。

 

总结nio和reactor:

nio中的selector是一种最简扑的reactor模式:

nio 首先open一个ServerSocketChannel,然后configureBlocking为false,然后或缺serversocket并绑定端口。

打开selector,注册一个连接事件

死循环:不断监听不同通道产生的时间

遍历selectionKey事件,可以获取到对应的通道,将通道和通道感兴趣的时间注册到selector中

业务逻辑处理不同的事件。

 

reactor对nio进行封装:1.当应用想initiation disapatcher注册具体事件处理器时,应用会标识出该事件处理器希望initiation disapatcher在某个事件发生的时候向其通知该事件,该事件与handle关联

2.initiation disapatcher会要求每个事件处理器向其内部传递handle,该handle向操作系统标识了事件处理器

3.当所有的事件处理器注册完毕后,应用会调用handle_event来启动initiation disapatcher的事件循环

这时,initiation disapatcher会将每个事件处理器的handle合并起来,并使用同步事件分离器等待事件的发生,比如说tcp协议层会使用同步事件分离器操作来等待客户端发送的数据到连接的socket handle上

4.当与某个事件对应的handle变成ready状态的时候,比如说 tcp,socket 变为等待状态时,同步事件分离器就会通知initiation disapatcher

5.initiation disapatcher会触发事件处理器的回调方法,从而响应这个处于ready状态的handle,当事件发生的时候,initiation disapatcher会被事件源**handle作为key来寻找分发恰当的事件处理器回调方法

6.initiation disapatcher会回调事件处理器的handle_events回调方法来执行特定于应用的功能,从而响应这个事件,所发生的的事件类型可以作为该方法参数并被该方法内部使用来执行额外的特定于服务的分离和分发

五.netty源码分析:

 

六.netty的实际应用: