Java的Future模式
1 什么是future模式
Future 模式类似于我们发送Ajax请求,这个请求是异步的,用户无需一直做等待,可以先做其他事情,等请求获取到数据,在继续做之前的事情。或者也类似于我们做饭,但是没有厨具,这个时候我们在网上下了订单,网站会提示给你订单已经受理了,但是我们不用一直等待厨具给我们送过来,在等待这个期间,我们可以先买菜,最后厨具到了一起做饭。
那如何实现呢?
首先,客户端Future向服务器发送请求,于此同时,服务器并不会返回真实的数据,他会给你返回一个包装类FutureData,然后服务器开启一个线程请求真实的数据RealData,真实的数据获取完毕,给客户端一个通知,或者客户端可以阻塞其他线程等待真正资源加载完毕。
我们定义一个数据接口Data,无论真实数据RealData和包装类数据FutureData都有继承这个接口。
public interface Data {
String getRequest();
}
接下来我们看一下main方法,它通过调用FutureClient去取获想要的数据RealData,RealData继承Data接口。
public class Main { public static void main(String[] args) throws InterruptedException { FutureClient fc = new FutureClient(); Data data = fc.request("请求参数"); System.out.println("data is " + data); System.out.println("请求发送成功!"); System.out.println("做其他的事情..."); String result = data.getRequest(); System.out.println(result); } }
然后我们看一下FutureClient 是如何帮助我们获取数据的,所有我们看上面的main方法中fc.request("请求参数")中返回的并不是真实的数据,返回的是一个包装类,同时它将启动一个新的线程取请求真实的数据。
public class FutureClient { public Data request(final String queryStr){ //1 我想要一个代理对象(Data接口的实现类)先返回给发送请求的客户端,告诉他请求已经接收到,可以做其他的事情 final FutureData futureData = new FutureData(); //2 启动一个新的线程,去加载真实的数据,传递给这个代理对象 new Thread(new Runnable() { @Override public void run() { //3 这个新的线程可以去慢慢的加载真实对象,然后传递给代理对象 RealData realData = new RealData(queryStr); futureData.setRealData(realData); } }).start(); return futureData; } }
接下来我们看一下FutureData的实现,他会等待RealData请求完毕,然后发出通知获取真实数据。从而在main方法中的data.getRequest();就可以获取真实的执行结果,这个方法是阻塞线程执行的,因为我们看到在FutureData方法中有wait和notify,只有真正的结果获取到,才能获取到结果。
public class FutureData implements Data{ private RealData realData ; private boolean isReady = false; public synchronized void setRealData(RealData realData) { //如果已经装载完毕了,就直接返回 if(isReady){ return; } //如果没装载,进行装载真实对象 this.realData = realData; isReady = true; //进行通知 notify(); } @Override public synchronized String getRequest() { //如果没装载好 程序就一直处于阻塞状态 while(!isReady){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } //装载好直接获取数据即可 return this.realData.getRequest(); } }
接下来我们看一下RealData的实现,真正获取数据的实现。
public class RealData implements Data{ private String result ; public RealData (String queryStr){ System.out.println("根据" + queryStr + "进行请求真实数据,这是一个很耗时的操作.."); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("操作完毕,获取结果"); result = "请求真实结果"; } @Override public String getRequest() { return result; } }
执行结果如下:根据请求参数进行请求真实数据,这是一个很耗时的操作..
请求发送成功!
做其他的事情...
操作完毕,获取结果
获取到真实结果
2 JDK自带实现类--FutureTask
模拟代码:
public class MyFutureTask { public static void main(String[] args) throws InterruptedException, ExecutionException { long startTime = System.currentTimeMillis(); Callable<RealData> myTask = new Callable<RealData>() { @Override public RealData call() throws Exception { return new RealData("获取真实结果"); } }; FutureTask<RealData> task = new FutureTask<RealData>(myTask); new Thread(task).start(); System.out.println("请求发送成功!"); System.out.println("做其他的事情..."); Thread.sleep(2000); if (!task.isDone()) { // 是否处理完 System.out.println("请耐心等待结果,"); } RealData realData = task.get(); System.out.println(realData.getRequest()); } }
分析一下上面的代码,
真实数据的获取被封装到Callable的call方法中。
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
1)大体说下Callable和runnable区别:
2)创建一个FutureTask对象,将Callable作为参数传入,然后把这个对象作为Runnable,作为参数,重新起一个线程。
FutureTask继承RunnabelFuture,Ruannable继承Runnable和Future,其中Future接口定义了如下方法:
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
下面是Future接口定义的方法
boolean cancel(boolean mayInterruptIfRunning); //取消执行,会抛出异常
boolean isCancelled(); //判断是否取消
boolean isDone(); //判断是否执行完成
V get() throws InterruptedException, ExecutionException;//获取执行结果,会阻塞住线程
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException; //获取执行结果,第一参数是时间,第二个是时间单位,
//如果在指定时间未获取成功,会抛出TimeoutException异常
3)从上面的方法中我们看到FutureTask同时继承了Runnable方法,所以也有run方法,如下可以发现run方法主要调用了call方法
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
4) 在run方法的开头判断了state,那state是什么?如下
/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
api注释大概意思是任务的初始状态是NEW,当获取到结果呈现COMPLETING状态,它是瞬时的,当调用者打断时,它是INTERRUPTING,也是瞬时态。
NORMAL顺利执行完,CANCELED被取消,INTERRUPTTED被打断,EXCEPTIONAL异常。状态转换看上面灰色部分。
5)get方法,主要调用awaitDone方法,主要是等待执行完成,如果出现异常或者超时会终止。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
3 应用
1)需要并行执行,任务不多情况下
2)高并发情况下数创建数据库连接,我们可以通过一个线程创建一个连接,并将这个连接放入Callable的call方法中,
这样其他线程通过调用get方法就可以获取该连接,避免同时创建多个connection,浪费资源