NIO与Socket笔记 :AIO的使用
AIO 也叫NIO2.0 是一种非阻塞异步的通信模式。在NIO的基础上引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。
AIO 并没有采用NIO的多路复用器,而是使用异步通道的概念。其read,write方法的返回类型都是Future对象。而Future模型是异步的,其核心思想是:去主函数等待时间。
小结:AIO模型中通过AsynchronousSocketChannel和AsynchronousServerSocketChannel完成套接字通道的实现。非阻塞,异步。
AsynchronousFileChannel 类的使用
AsynchronousFileChannel 类用于读取、写入和操作文件的异步通道 。
在通过调用此类定义的open()方法打开文件时,将创建一个异步文件通道。
该文件包 含可读写 的、可查询其当前大小的可变长度的字节序列 。
当写人字节超出其当前大小时 ,文件的大小会增加 。 文件的大小在截断时会减小 。
异步文件通道在文件 中没有当前位置 ,而是将文件位置指定给启动异步操作 的每个读 取和写入方法。
CompletionHandler被指定为参数,并被调用以消耗 1/0操作的结果。
此类 还定义了启动异步操作的读取和写入方法,并返回 Future 对象以表示操作的挂起结果 。
将来可用于检查操作是否已完成,等待完成,然后检索结果 。
获取此通道文件的独占锁
public final Future<FileLock> lock()方法的作用是获取此通道文件的独占锁。
此方法启 动一个操作以获取此通道的文件的独占锁 。 该方法返回一个表示操作的挂起结果的 Future 对象。
Future 的 get()方法在成功完成时返 回 FileLock。
获取通道文件给定区域的锁
public abstract Future<FileLock> lock(long position, long size, boolean shared)方法的作 用是获取此通道文件给定区域的锁 。
此方法启动 一个操作以获取此信道文件的给定区域的 锁。
参数 position 代表锁定区域的起始位置,必须是非负数 。 size 代表锁定区域的大小,必 须是非负数,并且 position + size 的结果必须是非负数 。 shared值为 true代表请求的是共享 锁,在这种情况下,此通道必须为读取(并可能写人)打开,如果请求排他锁,在这种情况 下,此通道必须为写入而打开(并且可能读取)。 返回值代表待定结果的 Future对象。
实现重量锁定
在两个进程对同一个文件的锁定范围有重叠时, 会出现阻塞的状态。
返回此通道文件当前大小与通道打开状态
public abstract long size()方法的作用是返回此通道文件的当前大小。
public boolean isOpen()方法的作用是判断通道是否呈打开的状态。
CompletionHandler 接 口的使用
public final <A> void lock(A attachment,CompletionHandler<FileLock,? superA> handler) 方法的作用是获取此通道文件的独占锁。
此方法启动一个操作以获取此通道文件的给定区 域的锁。
public void failed (Throwable exc, A attachment)方法调用时机
public void failed(Throwable exc, A attachment)方法被调用的时机是出现 I/O 操作异常时 。
执行指定范围的锁定与传入附件及整合接口
public abstract <A> void lock(long position,long size,boolean shared,A attachment,Cornpleti- onHandler<FileLock,? super A> handler)方法的作用是将
public abstract Future<FileLock> lock (long position, long size, boolean shared)方法和
public final < A > void Iock(A attachrnent,Cornp - letionHandler<FileLock,? super A> handler)
方法进行了整合 。
执行锁定与传入附件及整合接口 CompletionHandler
如果 public final <A> void lock(A attachment,CompletionHandler<FileLock,? super A> handler) 方法获得不到锁,则一直等待 。
lock (position, size, shared, attachment,CompletionHandler)方法的特点
如果 lock(position, size, shared, attachment,CompletionHandler)方法获得不到锁,则一 直等待。
读取数据方式 1
public abstract Future<lnteger> read(ByteBuffer dst, long position)方法的作用是从给定的 文件位置开始,
从该通道将字节序列读人给定的缓冲区。
此方法从给定的文件位置开始 , 将 从该通道的字节序列读取到给定的缓冲区。
此方法返回 Future对象。 如果给定位置大于或 等于在尝试读取时文件的大小,则 Future的 get()方法将返回读取的字节数或-1。
此方法的工作方式与 AsynchronousByteChannel.read(ByteBuffer)方法相同,只是从给 定文件位置开始读取字节。
如果给定的文件位置大于文件在读取时的大小,则不读取任何字 节。
参数 dst代表要将字节传输到的缓冲区。参数 position代表开始的文件位置,必须是非 负数。
读取数据方式 2
写入数据方式1
写入数据方式 2
AsynchronousServerSocketChannel和 AsynchronousSocketChannel 类的使用
AsynchronousServerSocketChannel类是面向流的侦昕套接字的异步通道。
1个 Asynchronous ServerSocketChannel 通道 是 通过调用此类的 open()方法 创 建的 。
新 创 建的 Asynchronous ServerSocketChannel 已打开但尚 未绑定 。
它可以 绑定 到 本 地地址 ,并 通过调用 bind()方法 来配置为侦昕连接。
一旦绑定, accept()方法被用来启动接受连接到通道的 Socket。
此类型的通道是线程安全的,可由多个并发线程使用,但在大多数情况下 ,在任何时 候都可以完成一个 accept操作。
AsynchronousSocketChannel 类是面向流的连接套接字的异步通道 。
使用 AsynchronousSocketChannel类的 open()方法创建的是未连接状态的 Asynchronous SocketChannel对象,
之后再使用 connect()方法将未连接的 AsynchronousSocketChannel变 成已连接的 AsynchronousSocketChannel对象,
详述如下:
1)创建 AsynchronousSocketChannel是通过调用此类定义的 open()方法,新创建的 AsynchronousSocketChannel 呈已打开但尚未连接的状态 。 当连接到 AsynchronousServerSocket Channel 的套接字时,将创建连接的 AsynchronousSocketChannel 对象 。 不可能为任意的、 预先存在的 Socket创建异步套接宇通道。
2 )通过调用 connect()方法将未连接的通道变成已连接,连接后该通道保持连接,直到 关闭 。 是否连接套接字通道可以通过调用其 getRemoteAddress()方法来确定。 尝试在未连接 的通道上调用 IO 操作将导致引发 NotYetConnectedException 异常 。
此类型 的通道可以 安全 地由 多个并发线程使用 。
它们支持并发读写,虽然最多一次读 取操作,并且一个写操作可以在任何时候未完成。
如果一个线程在上一个读操作完成之前启 动了 read操作,则会引发 ReadPendingException异常。
类似的,尝试在前一个写操作完成 之前启动一个写运算将会引发一个 WritePendingException 异常 。
代码样例:
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* AIO, 也叫 NIO2.0 是一种异步非阻塞的通信方式
* AIO 引入了异步通道的概念 AsynchronousServerSocketChannel和AsynchronousSocketChannel 其read和write方法返回值类型是Future对象。
*/
public class ITDragonAIOServer {
private ExecutorService executorService; // 线程池
private AsynchronousChannelGroup threadGroup; // 通道组
public AsynchronousServerSocketChannel asynServerSocketChannel; // 服务器通道
public void start(Integer port){
try {
// 1.创建一个缓存池
executorService = Executors.newCachedThreadPool();
// 2.创建通道组
threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
// 3.创建服务器通道
asynServerSocketChannel = AsynchronousServerSocketChannel.open(threadGroup);
// 4.进行绑定
asynServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("server start , port : " + port);
// 5.等待客户端请求
asynServerSocketChannel.accept(this, new ITDragonAIOServerHandler());
// 一直阻塞 不让服务器停止,真实环境是在tomcat下运行,所以不需要这行代码
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ITDragonAIOServer server = new ITDragonAIOServer();
server.start(8888);
}
}
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import com.itdragon.util.CalculatorUtil;
public class ITDragonAIOServerHandler implements CompletionHandler<AsynchronousSocketChannel, ITDragonAIOServer> {
private final Integer BUFFER_SIZE = 1024;
@Override
public void completed(AsynchronousSocketChannel asynSocketChannel, ITDragonAIOServer attachment) {
// 保证多个客户端都可以阻塞
attachment.asynServerSocketChannel.accept(attachment, this);
read(asynSocketChannel);
}
//读取数据
private void read(final AsynchronousSocketChannel asynSocketChannel) {
ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
asynSocketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer resultSize, ByteBuffer attachment) {
//进行读取之后,重置标识位
attachment.flip();
//获取读取的数据
String resultData = new String(attachment.array()).trim();
System.out.println("Server -> " + "收到客户端的数据信息为:" + resultData);
String response = resultData + " = " + CalculatorUtil.cal(resultData);
write(asynSocketChannel, response);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
// 写入数据
private void write(AsynchronousSocketChannel asynSocketChannel, String response) {
try {
// 把数据写入到缓冲区中
ByteBuffer buf = ByteBuffer.allocate(BUFFER_SIZE);
buf.put(response.getBytes());
buf.flip();
// 在从缓冲区写入到通道中
asynSocketChannel.write(buf).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ITDragonAIOServer attachment) {
exc.printStackTrace();
}
}
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.Random;
public class ITDragonAIOClient implements Runnable{
private static Integer PORT = 8888;
private static String IP_ADDRESS = "127.0.0.1";
private AsynchronousSocketChannel asynSocketChannel ;
public ITDragonAIOClient() throws Exception {
asynSocketChannel = AsynchronousSocketChannel.open(); // 打开通道
}
public void connect(){
asynSocketChannel.connect(new InetSocketAddress(IP_ADDRESS, PORT)); // 创建连接 和NIO一样
}
public void write(String request){
try {
asynSocketChannel.write(ByteBuffer.wrap(request.getBytes())).get();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
asynSocketChannel.read(byteBuffer).get();
byteBuffer.flip();
byte[] respByte = new byte[byteBuffer.remaining()];
byteBuffer.get(respByte); // 将缓冲区的数据放入到 byte数组中
System.out.println(new String(respByte,"utf-8").trim());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
}
}
public static void main(String[] args) throws Exception {
for (int i = 0; i < 10; i++) {
ITDragonAIOClient myClient = new ITDragonAIOClient();
myClient.connect();
new Thread(myClient, "myClient").start();
String []operators = {"+","-","*","/"};
Random random = new Random(System.currentTimeMillis());
String expression = random.nextInt(10)+operators[random.nextInt(4)]+(random.nextInt(10)+1);
myClient.write(expression);
}
}
}
NIO技术属于同步非阻塞