java之IO模型(BIO NIO AIO)
1.基本概念
BIO: Blocking IO,阻塞式IO,线程发起io请求后,一直阻塞(阻塞io),直到数据就绪后,用户线程将数据写入socket空间,或从socket空间读取数据(同步)
NIO: Non-blocking IO,非阻塞式IO,需要用户线程定时轮训,去检查IO数据是否就绪,占用应用程序线程资源。IO多路复用模型中,将检查IO数据是否就绪的任务,交给系统级别的select或poll模型,由系统进行监控,减轻用户线程负担。
AIO: Asouync IO,异步IO,线程发起io请求后,立即返回(非阻塞io),当数据读写完成后,OS通知用户线程(异步)。这里数据写入socket空间,或从socket空间读取数据到用户空间由OS完成,用户线程无需介入,所以也就不会阻塞用户线程,即异步。AIO基于时间驱动思想,采用proactor模式。数据完成后,由os主动通知应用程序,通过epoll实现,节省了NIO中selector循环遍历检测数据就绪的资源开销。同时,数据copy操作(用户空间<->socket空间)是由os完成的,无需应用程序参与,大大提高应用程序效率。
2.优缺点
查了一些资料,都是用网络编程为例来说明BIO、NIO、AIO的区别,这里就不再敲代码了,直接说说彼此的优缺点好了。
BIO:优点是编码简单,容易上手、调试。缺点则是因为多线程本身的问题,①每个线程都要分配内存空间(最少128K),②线程的切换需要耗费时间。所以,一是线程数量有限;二是线程多了会严重降低响应时间。当然也可以使用线程池,但又涉及到队列任务积压问题。
NIO:刚好跟BIO相反。优点是速度快,耗费资源少。缺点是编码复杂,难以上手、调试。
AIO:事件驱动(回调),性能高。
话不多说,上代码:
BIO 客户端:
public class BIOClient { public static void main(String[] args) throws Exception { Socket client = new Socket(HostInfo.HOST_NAME,HostInfo.PORT) ; // 定义连接的主机信息 Scanner scan = new Scanner(client.getInputStream()) ; // 获取服务器端的响应数据 scan.useDelimiter("\n") ; PrintStream out = new PrintStream(client.getOutputStream()) ; // 向服务器端发送信息内容 boolean flag = true ; // 交互的标记 while(flag) { String inputData = InputUtils.getInputString("请输入要发送的内容:").trim() ; out.println("【客户端】:"+inputData); // 把数据发送到服务器端上 if(scan.hasNext()) { String str = scan.next().trim() ; System.out.println(str); } if ("byebye".equalsIgnoreCase(inputData)) { flag = false ; } } client.close(); } }
BIO 服务端:
public class BIOServer { public static void main(String[] args) throws Exception{ ServerSocket serverSocket = new ServerSocket(HostInfo.PORT) ;// 设置监听端口 System.out.println("服务器端已经启动,监听的端口为:" + HostInfo.PORT); boolean flag = true ; ExecutorService executorService = Executors.newFixedThreadPool(10) ; while(flag) { Socket client = serverSocket.accept() ; executorService.submit(new EchoClientHandler(client)) ; } executorService.shutdown() ; serverSocket.close() ; } private static class EchoClientHandler implements Runnable { private Socket client ; // 每一个客户端都需要启动一个任务(task)来执行。 private Scanner scanner ; private PrintStream out ; private boolean flag = true ; // 循环标记 public EchoClientHandler(Socket client) { this.client = client ; // 保存每一个客户端操作 try { this.scanner = new Scanner(this.client.getInputStream()) ; this.scanner.useDelimiter("\n") ; // 设置换行符 this.out = new PrintStream(this.client.getOutputStream()) ; } catch (IOException e) { e.printStackTrace(); } } public void run() { while(this.flag) { if (this.scanner.hasNext()) { // 现在有数据进行输入 String val = this.scanner.next().trim() ; // 去掉多余的空格内容 System.err.println(val); if("byebye".equalsIgnoreCase(val)) { this.out.println("ByeByeByte..."); this.flag = false ; } else { String inputData = InputUtils.getInputString("请输入要发送的内容:").trim() ; out.println("【服务端】" + inputData); } } } this.scanner.close(); this.out.close(); try { this.client.close(); } catch (IOException e) { } } } }
注:通过开启线程池的方式,管理服务端,实现伪异步
NIO 客户端:
public class NIOEchoClient { public static void main(String[] args) throws Exception { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress(HostInfo.HOST_NAME,HostInfo.PORT)); ByteBuffer buffer = ByteBuffer.allocate(50); boolean flag = true; while (flag) { buffer.clear(); String inputString = InputUtils.getInputString("请输入要发送的内容:"); buffer.put(inputString.getBytes()); buffer.flip(); socketChannel.write(buffer); //读取数据前清除缓存 buffer.clear(); int readCount = socketChannel.read(buffer); buffer.flip(); System.out.println(new String(buffer.array(),0,readCount)); if ("byebye".equals(inputString)) { flag = false; } } socketChannel.close(); } }
NIO 服务端:
public class NIOServer { public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newFixedThreadPool(50); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //设置模式为非阻塞 serverSocketChannel.configureBlocking(false); //绑定一个网络服务端口 serverSocketChannel.bind(new InetSocketAddress(HostInfo.PORT)); //设置一个多路复用器,来作为一个选择器出现,目的是管理所有的channel Selector selector = Selector.open(); //将所有的channel注册到selector serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务器已经启动,监听端口为:" + HostInfo.PORT); //NIO用的是轮询模式,每当发现有客户端连接时,开启一个线程,由线程池管理 int keySelector = 0; while ((keySelector = selector.select()) > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if(key.isAcceptable()){ SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { executorService.execute(new NIOEchoHandler(socketChannel)); } } } } } private static class NIOEchoHandler implements Runnable { private SocketChannel socketChannel; private boolean flag = true; public NIOEchoHandler(SocketChannel socketChannel) { this.socketChannel = socketChannel; } @SneakyThrows @Override public void run() { ByteBuffer buffer = ByteBuffer.allocate(50); while (this.flag) { buffer.clear(); try { int readCount = this.socketChannel.read(buffer); String readMessage = new String(buffer.array(), 0, readCount).trim(); System.out.println("【客户端】:" + readMessage); String writeMessage = "【Echo】:" + readMessage + "\n"; if ("byebye".equals(readMessage)) { writeMessage = "【Exit】:拜拜,下次见!"; this.flag = false; } buffer.clear(); buffer.put(writeMessage.getBytes()); buffer.flip(); this.socketChannel.write(buffer); } catch (IOException e) { e.printStackTrace(); } } this.socketChannel.close(); } } }
AIO 客户端:
class ClientReadHandler implements CompletionHandler<Integer,ByteBuffer> { private AsynchronousSocketChannel clientChannel ; private CountDownLatch latch ; public ClientReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel ; this.latch = latch ; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip() ; String readMessage = new String(buffer.array(),0,buffer.remaining()) ; System.out.println(readMessage); // 输出读取内容 } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } this.latch.countDown(); } } class ClientWriteHandler implements CompletionHandler<Integer,ByteBuffer> { private AsynchronousSocketChannel clientChannel ; private CountDownLatch latch ; public ClientWriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel ; this.latch = latch ; } @Override public void completed(Integer result, ByteBuffer buffer) { if(buffer.hasRemaining()) { this.clientChannel.write(buffer,buffer,this); } else { ByteBuffer readBuffer = ByteBuffer.allocate(100) ; // 读取服务端回应 this.clientChannel.read(readBuffer,readBuffer,new ClientReadHandler(this.clientChannel,this.latch)) ; } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } this.latch.countDown(); } } class AIOClientThread implements Runnable { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public AIOClientThread() { try { this.clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } this.clientChannel.connect(new InetSocketAddress(HostInfo.HOST_NAME, HostInfo.PORT)); this.latch = new CountDownLatch(1); } @Override public void run() { try { this.latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public boolean sendMessage(String msg) { ByteBuffer buffer = ByteBuffer.allocate(100) ; buffer.put(msg.getBytes()) ; buffer.flip() ; this.clientChannel.write(buffer,buffer,new ClientWriteHandler(this.clientChannel,this.latch)); if("byebye".equalsIgnoreCase(msg)) { return false ; } return true ; } } public class AIOEchoClient { public static void main(String[] args) { AIOClientThread client = new AIOClientThread() ; new Thread(client).start(); while(client.sendMessage(InputUtils.getInputString("请输入要发送的内容:"))) { ; } } }
AIO 服务端:
public class AIOEchoServer { public static void main(String[] args) { new Thread(new AIOServerThread()).start(); } } class AIOServerThread implements Runnable{ private AsynchronousServerSocketChannel serverSocketChannel; private CountDownLatch latch; public AIOServerThread(){ try { this.serverSocketChannel = AsynchronousServerSocketChannel.open(); this.latch = new CountDownLatch(1); this.serverSocketChannel.bind(new InetSocketAddress(HostInfo.HOST_NAME,HostInfo.PORT)); System.out.println("服务器启动成功,在" + HostInfo.PORT + "端口上监听服务 ..."); } catch (IOException e) { e.printStackTrace(); } } public AsynchronousServerSocketChannel getChannel(){ return this.serverSocketChannel; } public CountDownLatch getLatch(){ return this.latch; } @Override public void run() { this.serverSocketChannel.accept(this,new AcceptHandler()); try { this.latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServerThread> { @Override public void completed(AsynchronousSocketChannel result, AIOServerThread attachment) { attachment.getChannel().accept(attachment,this); ByteBuffer buffer = ByteBuffer.allocate(50); result.read(buffer,buffer,new EchoHandler(result)); } @Override public void failed(Throwable exc, AIOServerThread attachment) { System.out.println("服务器端客户端连接失败 ..."); attachment.getLatch().countDown(); } } }
public class EchoHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private boolean exit = false; public EchoHandler(AsynchronousSocketChannel clientChannel) { this.clientChannel = clientChannel; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); String reasMessage = new String(buffer.array(), 0, buffer.remaining()); String writeMessage = "【Echo】:" + reasMessage + "\n"; if ("byebye".equals(reasMessage)) { writeMessage = "【Exit】拜拜,下次见"; this.exit = true; } this.writeMessage(writeMessage); } private void writeMessage(String writeMessage) { final ByteBuffer buffer = ByteBuffer.allocate(50); buffer.put(writeMessage.getBytes()); buffer.flip(); this.clientChannel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buf) { if(buf.hasRemaining()){ EchoHandler.this.clientChannel.write(buffer,buffer,this); }else { if (EchoHandler.this.exit == false) { ByteBuffer readerBuffer = ByteBuffer.allocate(50); EchoHandler.this.clientChannel.write(readerBuffer,readerBuffer,new EchoHandler(EchoHandler.this.clientChannel)); } } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { EchoHandler.this.clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } @Override public void failed(Throwable exc, ByteBuffer buffer) { try { EchoHandler.this.clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } }