Java NIO (三)

在上一篇中介绍了缓冲区的原理,下面来介绍NIO中另一个核心对象选择器(Selector)以及NIO的原理。

在Client/Server模型中,Server往往需要同时处理大量来自Client的访问请求,因此Server端需采用支持高并发访问的架构。一种简单而又直接的解决方案是“one-thread-per-connection”。这是一种基于阻塞式I/O的多线程模型。在该模型中,Server为每个Client连接创建一个处理线程,每个处理线程阻塞式等待可能达到的数据,一旦数据到达,则立即处理请求、返回处理结果并再次进入等待状态。由于每个Client连接有一个单独的处理线程为其服务,因此可保证良好的响应时间。但当系统负载增大(并发请求增多)时,Server端需要的线程数会增加,这将成为系统扩展的瓶颈所在。

Java NIO (三)

 

Java NIO不但引入了全新的高效的I/O机制,同时引入了基于Reactor设计模式的多路复用异步模式。NIO包中主要包含以下几种核心对象。

* Channel(通道):NIO把它支持的I/O对象抽象为Channel。它模拟了通信连接,类似于原I/O中的流(Stream),用户可以通过它读取和写入数据。实例类有SocketChannel、ServerSocketChannel、DatagramChannel、FileChannel等。

* Buffer(缓冲区):Buffer是一块连续的内存区域,一般作为Channel收发数据的载体出现。所有数据都通过Buffer对象来处理。

* Selector(选择器):Selector类提供了监控一个和多个通道当前状态的机制。只要Channel向Selector注册了某种特定的事件,Selector就会监听这些事件是否会发生,一旦发生某个事件,便会通知对应的Channel。使用选择器,借助单一线程,就可对数量庞大的活动I/O通道实施监控和维护。

Java NIO (三)

 

Java NIO的服务端只需启动一个专门的线程来处理所有的IO事件,这种通信模型是怎么实现的呢?Java NIO采用了双向通道(Channel)进行数据传输,而不是单向的流(Stream),在通道上可以注册我们感兴趣的事件。一共有以下四种事件:

 

事件名 对应值
服务端接收客户端连接事件 SelectionKey.OP_ACCEPT(16) 
客户端连接服务端事件 SelectionKey.OP_CONNECT(8)
读事件 SelectionKey.OP_READ(1)
写事件 SelectionKey.OP_WRITE(4)

服务端和客户端各自维护一个管理通道的对象,我们称之为Selector,该对象能检测一个或多个通道 (Channel) 上的事件。我们以服务端为例,如果服务端的Selector上注册了读事件,某时刻客户端给服务端发送了一些数据,阻塞I/O这时会调用read()方法阻塞地读取数据,而NIO的服务端会在Selector中注册一个读事件,然后服务端的处理线程会轮询地访问Selector,如果发现有读事件到达,则处理这些事件,如果没有则处理线程会一直阻塞直到读事件到达为止。

 

 

为了更好地理解java NIO,下面贴出服务端和客户端的简单代码实现。

服务端:

 

[java] view plain copy

  1. import java.io.IOException;  
  2. import java.net.InetSocketAddress;  
  3. import java.net.ServerSocket;  
  4. import java.nio.ByteBuffer;  
  5. import java.nio.channels.SelectionKey;  
  6. import java.nio.channels.Selector;  
  7. import java.nio.channels.ServerSocketChannel;  
  8. import java.nio.channels.SocketChannel;  
  9. import java.util.Iterator;  
  10. import java.util.Set;  
  11.   
  12. /** 
  13.  * NIO服务端 
  14.  *  
  15.  */  
  16. public class NIOServer {  
  17.   
  18.     private Selector selector;  
  19.     private ByteBuffer echoBuffer = ByteBuffer.allocate(1024);  
  20.   
  21.     /** 
  22.      * 获得一个ServerSocket通道,并对该通道做一些初始化的工作 
  23.      *  
  24.      * @param port 
  25.      * @throws IOException 
  26.      */  
  27.     public void initServer(int port) throws IOException {  
  28.         selector = Selector.open();  
  29.   
  30.         // 获得一个ServerSocket通道,并设置为非阻塞  
  31.         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();  
  32.         serverSocketChannel.configureBlocking(false);  
  33.   
  34.         // 将通道对应的ServerSocket绑定到port端口  
  35.         ServerSocket serverSocket = serverSocketChannel.socket();  
  36.         InetSocketAddress address = new InetSocketAddress(port);  
  37.         serverSocket.bind(address);  
  38.   
  39.         // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件。  
  40.         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  
  41.     }  
  42.   
  43.     /** 
  44.      * 循环监听selector上是否有需要处理的事件,如果有,则进行处理 
  45.      *  
  46.      * @throws IOException 
  47.      */  
  48.     public void listen() throws IOException {  
  49.         System.out.println("服务端启动成功!");  
  50.   
  51.         // 循环访问selector  
  52.         while (true) {  
  53.             // //当注册的事件到达时,方法返回;否则,该方法会一直阻塞  
  54.             selector.select();  
  55.   
  56.             // 返回发生了事件的SelectionKey对象的一个 集合  
  57.             Set<SelectionKey> selectedKeys = selector.selectedKeys();  
  58.             Iterator<SelectionKey> it = selectedKeys.iterator();  
  59.   
  60.             while (it.hasNext()) {  
  61.                 SelectionKey key = (SelectionKey) it.next();  
  62.   
  63.                 it.remove(); // 删除已选的key以防重复处理  
  64.   
  65.                 // 客户端请求连接事件  
  66.                 if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {  
  67.                     ServerSocketChannel server = (ServerSocketChannel) key  
  68.                             .channel();  
  69.                     SocketChannel channel = server.accept(); // 获得和客户端连接的通道  
  70.   
  71.                     channel.configureBlocking(false);// 设置成非阻塞  
  72.                     // 在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道注册读事件。  
  73.                     channel.register(this.selector, SelectionKey.OP_READ);  
  74.                 } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {  
  75.                     // 得到可读事件发生的Socket通道  
  76.                     SocketChannel channel = (SocketChannel) key.channel();  
  77.   
  78.                     while (true) {  
  79.                         echoBuffer.clear();  
  80.                         int r = channel.read(echoBuffer);  
  81.   
  82.                         // 判断拷贝是否完成  
  83.                         if (r <= 0) {  
  84.                             break;  
  85.                         }  
  86.   
  87.                         echoBuffer.flip();  
  88.                         channel.write(echoBuffer); // 将消息回送给客户端  
  89.                     }  
  90.   
  91.                     byte[] data = echoBuffer.array();  
  92.                     System.out.println("服务器收到并返回信息:" + new String(data).trim());  
  93.                 }  
  94.             }  
  95.         }  
  96.     }  
  97.   
  98.     /** 
  99.      * 启动服务端测试 
  100.      *  
  101.      * @throws IOException 
  102.      */  
  103.     public static void main(String[] args) throws IOException {  
  104.         NIOServer server = new NIOServer();  
  105.         server.initServer(8000);  
  106.         server.listen();  
  107.     }  
  108.   
  109. }  


客户端:

 

 

 

[java] view plain copy

  1. import java.io.IOException;  
  2. import java.net.InetSocketAddress;  
  3. import java.nio.ByteBuffer;  
  4. import java.nio.channels.SelectionKey;  
  5. import java.nio.channels.Selector;  
  6. import java.nio.channels.SocketChannel;  
  7. import java.util.Iterator;  
  8. import java.util.Set;  
  9.   
  10. /** 
  11.  * NIO客户端 
  12.  *  
  13.  */  
  14. public class NIOClient {  
  15.   
  16.     private Selector selector;  
  17.   
  18.     /** 
  19.      * 获得一个Socket通道,并对该通道做一些初始化的工作 
  20.      *  
  21.      * @param ip连接的服务器的ip 
  22.      * @param port连接的服务器的端口号 
  23.      * @throws IOException 
  24.      */  
  25.     public void initClient(String ip, int port) throws IOException {  
  26.         selector = Selector.open();  
  27.   
  28.         // 获得一个Socket通道,并设置为非阻塞  
  29.         SocketChannel channel = SocketChannel.open();  
  30.         channel.configureBlocking(false);  
  31.   
  32.         // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调  
  33.         // 用channel.finishConnect();才能完成连接  
  34.         channel.connect(new InetSocketAddress(ip, port));  
  35.   
  36.         // 将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。  
  37.         channel.register(selector, SelectionKey.OP_CONNECT);  
  38.     }  
  39.   
  40.     /** 
  41.      * 循环监听selector上是否有需要处理的事件,如果有,则进行处理 
  42.      *  
  43.      * @throws IOException 
  44.      */  
  45.     public void listen() throws IOException {  
  46.         // 循环访问selector  
  47.         while (true) {  
  48.             // //当注册的事件到达时,方法返回;否则,该方法会一直阻塞  
  49.             selector.select();  
  50.   
  51.             // 返回发生了事件的SelectionKey对象的一个 集合  
  52.             Set<SelectionKey> selectedKeys = selector.selectedKeys();  
  53.             Iterator<SelectionKey> it = selectedKeys.iterator();  
  54.   
  55.             while (it.hasNext()) {  
  56.                 SelectionKey key = (SelectionKey) it.next();  
  57.   
  58.                 it.remove(); // 删除已选的key以防重复处理  
  59.   
  60.                 // 连接事件发生  
  61.                 if ((key.readyOps() & SelectionKey.OP_CONNECT) == SelectionKey.OP_CONNECT) {  
  62.                     // 获得和服务器连接的通道  
  63.                     SocketChannel channel = (SocketChannel) key.channel();  
  64.   
  65.                     // 如果正在连接,则完成连接  
  66.                     if (channel.isConnectionPending()) {  
  67.                         channel.finishConnect();  
  68.                     }  
  69.   
  70.                     channel.configureBlocking(false);// 设置成非阻塞  
  71.                     // 在和服务器连接成功之后,为了可以向服务器发送信息,需要为通道注册写事件。  
  72.                     channel.register(this.selector, SelectionKey.OP_WRITE);  
  73.                 } else if ((key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {  
  74.                     // 得到可写事件发生的Socket通道  
  75.                     SocketChannel channel = (SocketChannel) key.channel();  
  76.   
  77.                     String msg = new String("Hello, Nice to meet you!");  
  78.                     channel.write(ByteBuffer.wrap(msg.getBytes())); // 向服务器发送信息  
  79.   
  80.                     // 在向服务器写完信息之后,为了可以接收到服务器返回的信息,需要给通道注册读事件。  
  81.                     channel.register(this.selector, SelectionKey.OP_READ);  
  82.   
  83.                     System.out.println("客户端发送信息:" + msg);  
  84.                 } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {  
  85.                     // 得到可读事件发生的Socket通道  
  86.                     SocketChannel channel = (SocketChannel) key.channel();  
  87.   
  88.                     ByteBuffer echoBuffer = ByteBuffer.allocate(1024);  
  89.   
  90.                     while (true) {  
  91.                         echoBuffer.clear();  
  92.                         int r = channel.read(echoBuffer);  
  93.   
  94.                         // 判断拷贝是否完成  
  95.                         if (r <= 0) {  
  96.                             break;  
  97.                         }  
  98.                     }  
  99.   
  100.                     byte[] data = echoBuffer.array();  
  101.                     System.out.println("客户端收到信息:" + new String(data).trim());  
  102.                 }  
  103.             }  
  104.         }  
  105.     }  
  106.   
  107.     /** 
  108.      * 启动客户端测试 
  109.      *  
  110.      * @throws IOException 
  111.      */  
  112.     public static void main(String[] args) throws IOException {  
  113.         NIOClient client = new NIOClient();  
  114.         client.initClient("localhost"8000);  
  115.         client.listen();  
  116.     }  
  117.   
  118. }  


输出结果为:

 

服务端:

 

Java NIO (三)

客户端:

Java NIO (三)