2.2 Java 非阻塞io(NIO)
Java 非阻塞io(NIO)
执行流程
NIO(reactor模型):线程发起IO请求,立即返回;内核在做好IO操作的准备之后,通过调用注册的回调函数通知线程做IO操作,线程开始阻塞,直到操作完成。
ServerSocketChannel的使用方式是面向服务器端的,一般的开发流程是:
- 获取一个ServerSocketChannel。
- 设置网络操作,这些参数主要是和TCP协议有关。
- 将ServerSocketChannel注册到Selector(多路复用器)。
- 将ServerSocketChannel和某个具体的地址绑定。
- 用户像多路复用器设置感兴趣的IO事件。
- 用户线程以阻塞或非阻塞方式轮询Selector来查看是否有就绪的IO事件。
- 用户针对不同的IO事件对Channel进行具体的IO操作。
SocketChannel主要是面向客户端的开发的,也是以open方式获取channel,客户端的开发流程大致如下:
- 获取一个SocketChannel。
- 设置Channel为非阻塞方式。
- 获取Selector。
- 将channel注册到Selector,并监听CONNECT事件。
- 调用channel的connect方法连接指定的服务器和端口。
- 如果连接成功则进行IO操作,如果没成功则轮询Selector处理CONNECT事件。
重点
- 当channel通道执行时间过长时,一样会进入阻塞
- NIO执行快的原因。
1.)byte buffer块读取不byte读取块
2.)只有当select()有数据时,才进入accept方法。防止一直监听等待。
ByteBuffer存储
一种是堆内存,另一种是直接内存,主要区别:
- 堆内存分配和回收比较快,但是网络数据需要从内核copy到堆中。
- 直接内存分配和回收比较慢,但是免去了从内核copy到堆中的一次copy
1.Server
-
开启通道选择器Selector
-
创建ServerSocketChannel通道 并绑定接口
-
ServerSocketChannel 通道注册到selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)并指定SelectionKey(通道类型) -
调用selector.select()方法检查IO就绪事件
-
如果就绪
1.)serverSocketChannel.accept()开始处理
2.Client
- SocketChannel.open()开启channel
- bind绑定IP和端口
- 开始读写
代码展示
Server
SelectionKey代表着一个Channel和Selector的关系的抽象,Channel向Selector注册的时候产生,由Selector维护。Selector维护着四个SelectionKey的集合:
SelectionKey.OP_READ
对应 00000001,通道中有数据可以进行读取
SelectionKey.OP_WRITE
对应 00000100,可以往通道中写入数据
SelectionKey.OP_CONNECT
对应 00001000,成功建立 TCP 连接
SelectionKey.OP_ACCEPT
对应 00010000,接受 TCP 连接
// 创建通道选择器。
Selector selector = Selector.open();
// 创建通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8081));
// 将通道注册到selector上
// 如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
serverSocketChannel.configureBlocking(false);
// 可以使用数组组合
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 获取通道数目
int readyChannels = selector.select();
if(readyChannels == 0){
continue;
}
// 获取SelectionKey既对通道的操作
Set<SelectionKey> readyKey = selector.selectedKeys();
// 遍历
Iterator<SelectionKey> iterator = readyKey.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
// if(key.isAcceptable()) {
// // a connection was accepted by a ServerSocketChannel.
// } else if (key.isConnectable()) {
// // a connection was established with a remote server.
// } else if (key.isReadable()) {
// // a channel is ready for reading
// } else if (key.isWritable()) {
// // a channel is ready for writing
// }
if (key.isAcceptable()) {
// 有已经接受的新的到服务端的连接
SocketChannel socketChannel = serverSocketChannel.accept();
// 有新的连接并不代表这个通道就有数据,
// 这里将这个新的 SocketChannel 注册到 Selector,监听 OP_READ 事件,等待数据
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// 有数据可读
// 上面一个 if 分支中注册了监听 OP_READ 事件的 SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
// 创建缓存空间
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int num = socketChannel.read(readBuffer);
if (num > 0) {
// 处理进来的数据...
System.out.println("收到数据:" + new String(readBuffer.array()).trim());
ByteBuffer buffer = ByteBuffer.wrap("返回给客户端的数据...".getBytes());
socketChannel.write(buffer);
} else if (num == -1) {
// -1 代表连接已经关闭
socketChannel.close();
}
}
}
}
}catch(IOException e){
// key.cancel();
// channel.socket().close();
// channel.close();
return;
}
}
}
client
public class Client {
public static void main(String[] args) {
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost",8081));
// 发送请求
ByteBuffer buffer = ByteBuffer.wrap("你好吗".getBytes());
socketChannel.write(buffer);
// 获取server响应
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int num ;
if((num = socketChannel.read(readBuffer))> 0 ){
readBuffer.flip();
byte[] result = new byte[num];
readBuffer.get(result);
String demo = new String(result,"UTF-8");
System.out.println("返回值:"+demo);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
结果和分析
server
client