java之IO模型(BIO NIO AIO)

1.基本概念

BIO: Blocking IO,阻塞式IO,线程发起io请求后,一直阻塞(阻塞io),直到数据就绪后,用户线程将数据写入socket空间,或从socket空间读取数据(同步)

java之IO模型(BIO NIO AIO)

NIO: Non-blocking IO,非阻塞式IO,需要用户线程定时轮训,去检查IO数据是否就绪,占用应用程序线程资源。IO多路复用模型中,将检查IO数据是否就绪的任务,交给系统级别的select或poll模型,由系统进行监控,减轻用户线程负担。

java之IO模型(BIO NIO AIO)

AIO: Asouync IO,异步IO,线程发起io请求后,立即返回(非阻塞io),当数据读写完成后,OS通知用户线程(异步)。这里数据写入socket空间,或从socket空间读取数据到用户空间由OS完成,用户线程无需介入,所以也就不会阻塞用户线程,即异步。AIO基于时间驱动思想,采用proactor模式。数据完成后,由os主动通知应用程序,通过epoll实现,节省了NIO中selector循环遍历检测数据就绪的资源开销。同时,数据copy操作(用户空间<->socket空间)是由os完成的,无需应用程序参与,大大提高应用程序效率。

java之IO模型(BIO NIO AIO)

2.优缺点

查了一些资料,都是用网络编程为例来说明BIO、NIO、AIO的区别,这里就不再敲代码了,直接说说彼此的优缺点好了。

BIO:优点是编码简单,容易上手、调试。缺点则是因为多线程本身的问题,①每个线程都要分配内存空间(最少128K),②线程的切换需要耗费时间。所以,一是线程数量有限;二是线程多了会严重降低响应时间。当然也可以使用线程池,但又涉及到队列任务积压问题。

NIO:刚好跟BIO相反。优点是速度快,耗费资源少。缺点是编码复杂,难以上手、调试。

AIO:事件驱动(回调),性能高。

话不多说,上代码:

BIO 客户端:    

public class BIOClient {
    public static void main(String[] args) throws Exception {
        Socket client = new Socket(HostInfo.HOST_NAME,HostInfo.PORT) ;  // 定义连接的主机信息
        Scanner scan = new Scanner(client.getInputStream()) ;   // 获取服务器端的响应数据
        scan.useDelimiter("\n") ;
        PrintStream out = new PrintStream(client.getOutputStream()) ; // 向服务器端发送信息内容
        boolean flag = true ; // 交互的标记
        while(flag) {
            String inputData = InputUtils.getInputString("请输入要发送的内容:").trim() ;
            out.println("【客户端】:"+inputData); // 把数据发送到服务器端上
            if(scan.hasNext()) {
                String str = scan.next().trim() ;
                System.out.println(str);
            }
            if ("byebye".equalsIgnoreCase(inputData)) {
                flag = false ;
            }
        }
        client.close();
    }
}

BIO 服务端:

public class BIOServer {
    public static void main(String[] args) throws Exception{
        ServerSocket serverSocket = new ServerSocket(HostInfo.PORT) ;// 设置监听端口
        System.out.println("服务器端已经启动,监听的端口为:" + HostInfo.PORT);
        boolean flag = true ;
        ExecutorService executorService = Executors.newFixedThreadPool(10) ;
        while(flag) {
            Socket client = serverSocket.accept() ;
            executorService.submit(new EchoClientHandler(client)) ;
        }
        executorService.shutdown() ;
        serverSocket.close() ;
    }

    private static class EchoClientHandler implements Runnable {
        private Socket client ; // 每一个客户端都需要启动一个任务(task)来执行。
        private Scanner scanner ;
        private PrintStream out ;
        private boolean flag = true ;   // 循环标记
        public EchoClientHandler(Socket client) {
            this.client = client ; // 保存每一个客户端操作
            try {
                this.scanner = new Scanner(this.client.getInputStream()) ;
                this.scanner.useDelimiter("\n") ; // 设置换行符
                this.out = new PrintStream(this.client.getOutputStream()) ;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public void run() {
            while(this.flag) {
                if (this.scanner.hasNext()) {   // 现在有数据进行输入
                    String val = this.scanner.next().trim() ; // 去掉多余的空格内容
                    System.err.println(val);
                    if("byebye".equalsIgnoreCase(val)) {
                        this.out.println("ByeByeByte...");
                        this.flag = false ;
                    } else {
                        String inputData = InputUtils.getInputString("请输入要发送的内容:").trim() ;
                        out.println("【服务端】" + inputData);
                    }
                }
            }
            this.scanner.close();
            this.out.close();
            try {
                this.client.close();
            } catch (IOException e) {
            }
        }
    }
}

注:通过开启线程池的方式,管理服务端,实现伪异步

NIO 客户端:

public class NIOEchoClient {
    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress(HostInfo.HOST_NAME,HostInfo.PORT));
        ByteBuffer buffer = ByteBuffer.allocate(50);
        boolean flag = true;
        while (flag) {
            buffer.clear();
            String inputString = InputUtils.getInputString("请输入要发送的内容:");
            buffer.put(inputString.getBytes());
            buffer.flip();
            socketChannel.write(buffer);

            //读取数据前清除缓存
            buffer.clear();
            int readCount = socketChannel.read(buffer);
            buffer.flip();
            System.out.println(new String(buffer.array(),0,readCount));
            if ("byebye".equals(inputString)) {
                flag = false;
            }
        }
        socketChannel.close();
    }
}

NIO 服务端:

public class NIOServer {
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(50);
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //设置模式为非阻塞
        serverSocketChannel.configureBlocking(false);
        //绑定一个网络服务端口
        serverSocketChannel.bind(new InetSocketAddress(HostInfo.PORT));

        //设置一个多路复用器,来作为一个选择器出现,目的是管理所有的channel
        Selector selector = Selector.open();
        //将所有的channel注册到selector
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务器已经启动,监听端口为:" + HostInfo.PORT);
        //NIO用的是轮询模式,每当发现有客户端连接时,开启一个线程,由线程池管理
        int keySelector = 0;
        while ((keySelector  = selector.select()) > 0) {
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                if(key.isAcceptable()){
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    if (socketChannel != null) {
                        executorService.execute(new NIOEchoHandler(socketChannel));
                    }
                }
            }
        }
    }

    private static class NIOEchoHandler implements Runnable {
        private SocketChannel socketChannel;
        private boolean flag = true;
        public NIOEchoHandler(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

        @SneakyThrows
        @Override
        public void run() {
            ByteBuffer buffer = ByteBuffer.allocate(50);
            while (this.flag) {
                buffer.clear();
                try {
                    int readCount = this.socketChannel.read(buffer);
                    String readMessage = new String(buffer.array(), 0, readCount).trim();
                    System.out.println("【客户端】:" + readMessage);
                    String writeMessage = "【Echo】:" + readMessage + "\n";
                    if ("byebye".equals(readMessage)) {
                        writeMessage = "【Exit】:拜拜,下次见!";
                        this.flag = false;
                    }
                    buffer.clear();
                    buffer.put(writeMessage.getBytes());
                    buffer.flip();
                    this.socketChannel.write(buffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            this.socketChannel.close();
        }
    }
}

AIO 客户端:

class ClientReadHandler implements CompletionHandler<Integer,ByteBuffer> {
    private AsynchronousSocketChannel clientChannel ;
    private CountDownLatch latch ;
    public ClientReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
        this.clientChannel = clientChannel ;
        this.latch = latch ;
    }
    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        buffer.flip() ;
        String readMessage = new String(buffer.array(),0,buffer.remaining()) ;
        System.out.println(readMessage); // 输出读取内容
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.clientChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.latch.countDown();
    }
}

class ClientWriteHandler implements CompletionHandler<Integer,ByteBuffer> {
    private AsynchronousSocketChannel clientChannel ;
    private CountDownLatch latch ;
    public ClientWriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
        this.clientChannel = clientChannel ;
        this.latch = latch ;
    }

    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        if(buffer.hasRemaining()) {
            this.clientChannel.write(buffer,buffer,this);
        } else {
            ByteBuffer readBuffer = ByteBuffer.allocate(100) ; // 读取服务端回应
            this.clientChannel.read(readBuffer,readBuffer,new ClientReadHandler(this.clientChannel,this.latch)) ;
        }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.clientChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.latch.countDown();
    }
}

class AIOClientThread implements Runnable {
    private AsynchronousSocketChannel clientChannel;
    private CountDownLatch latch;

    public AIOClientThread() {
        try {
            this.clientChannel = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.clientChannel.connect(new InetSocketAddress(HostInfo.HOST_NAME, HostInfo.PORT));
        this.latch = new CountDownLatch(1);
    }

    @Override
    public void run() {
        try {
            this.latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public boolean sendMessage(String msg) {
        ByteBuffer buffer = ByteBuffer.allocate(100) ;
        buffer.put(msg.getBytes()) ;
        buffer.flip() ;
        this.clientChannel.write(buffer,buffer,new ClientWriteHandler(this.clientChannel,this.latch));
        if("byebye".equalsIgnoreCase(msg)) {
            return false ;
        }
        return true ;
    }
}

public class AIOEchoClient {
    public static void main(String[] args) {
        AIOClientThread client = new AIOClientThread() ;
        new Thread(client).start();
        while(client.sendMessage(InputUtils.getInputString("请输入要发送的内容:"))) {
            ;
        }
    }
}

AIO 服务端:

public class AIOEchoServer {
    public static void main(String[] args) {
        new Thread(new AIOServerThread()).start();
    }
}

class AIOServerThread implements Runnable{
    private AsynchronousServerSocketChannel serverSocketChannel;
    private CountDownLatch latch;
    public AIOServerThread(){
        try {
            this.serverSocketChannel = AsynchronousServerSocketChannel.open();
            this.latch = new CountDownLatch(1);
            this.serverSocketChannel.bind(new InetSocketAddress(HostInfo.HOST_NAME,HostInfo.PORT));
            System.out.println("服务器启动成功,在" + HostInfo.PORT + "端口上监听服务 ...");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public AsynchronousServerSocketChannel getChannel(){
        return this.serverSocketChannel;
    }
    public CountDownLatch getLatch(){
        return this.latch;
    }



    @Override
    public void run() {
        this.serverSocketChannel.accept(this,new AcceptHandler());
        try {
            this.latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AIOServerThread> {

        @Override
        public void completed(AsynchronousSocketChannel result, AIOServerThread attachment) {
            attachment.getChannel().accept(attachment,this);
            ByteBuffer buffer = ByteBuffer.allocate(50);
            result.read(buffer,buffer,new EchoHandler(result));
        }

        @Override
        public void failed(Throwable exc, AIOServerThread attachment) {
            System.out.println("服务器端客户端连接失败 ...");
            attachment.getLatch().countDown();
        }
    }
}
public class EchoHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel clientChannel;
    private boolean exit = false;
    public EchoHandler(AsynchronousSocketChannel clientChannel) {
        this.clientChannel = clientChannel;
    }

    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        buffer.flip();
        String reasMessage = new String(buffer.array(), 0, buffer.remaining());
        String writeMessage = "【Echo】:" + reasMessage + "\n";
        if ("byebye".equals(reasMessage)) {
            writeMessage = "【Exit】拜拜,下次见";
            this.exit = true;
        }
        this.writeMessage(writeMessage);
    }

    private void writeMessage(String writeMessage) {
        final ByteBuffer buffer = ByteBuffer.allocate(50);
        buffer.put(writeMessage.getBytes());
        buffer.flip();
        this.clientChannel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buf) {
                if(buf.hasRemaining()){
                    EchoHandler.this.clientChannel.write(buffer,buffer,this);
                }else {
                    if (EchoHandler.this.exit == false) {
                        ByteBuffer readerBuffer = ByteBuffer.allocate(50);
                        EchoHandler.this.clientChannel.write(readerBuffer,readerBuffer,new EchoHandler(EchoHandler.this.clientChannel));
                    }
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    EchoHandler.this.clientChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public void failed(Throwable exc, ByteBuffer buffer) {
        try {
            EchoHandler.this.clientChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}