Java的Future模式

1  什么是future模式

     Future 模式类似于我们发送Ajax请求,这个请求是异步的,用户无需一直做等待,可以先做其他事情,等请求获取到数据,在继续做之前的事情。或者也类似于我们做饭,但是没有厨具,这个时候我们在网上下了订单,网站会提示给你订单已经受理了,但是我们不用一直等待厨具给我们送过来,在等待这个期间,我们可以先买菜,最后厨具到了一起做饭。

Java的Future模式

    那如何实现呢?

    首先,客户端Future向服务器发送请求,于此同时,服务器并不会返回真实的数据,他会给你返回一个包装类FutureData,然后服务器开启一个线程请求真实的数据RealData,真实的数据获取完毕,给客户端一个通知,或者客户端可以阻塞其他线程等待真正资源加载完毕。

    我们定义一个数据接口Data,无论真实数据RealData和包装类数据FutureData都有继承这个接口。

public interface Data {
   String getRequest();
}

    接下来我们看一下main方法,它通过调用FutureClient去取获想要的数据RealData,RealData继承Data接口。

public class Main {

   public static void main(String[] args) throws InterruptedException {
      
      FutureClient fc = new FutureClient();
      Data data = fc.request("请求参数");
      System.out.println("data is " + data);
      System.out.println("请求发送成功!");
      System.out.println("做其他的事情...");
      
      String result = data.getRequest();
      System.out.println(result);
      
   }
}  

    然后我们看一下FutureClient 是如何帮助我们获取数据的,所有我们看上面的main方法中fc.request("请求参数")中返回的并不是真实的数据,返回的是一个包装类,同时它将启动一个新的线程取请求真实的数据。

public class FutureClient {

   public Data request(final String queryStr){
      //1 我想要一个代理对象(Data接口的实现类)先返回给发送请求的客户端,告诉他请求已经接收到,可以做其他的事情
      final FutureData futureData = new FutureData();
      //2 启动一个新的线程,去加载真实的数据,传递给这个代理对象
      new Thread(new Runnable() {
         @Override
         public void run() {
            //3 这个新的线程可以去慢慢的加载真实对象,然后传递给代理对象
            RealData realData = new RealData(queryStr);
            futureData.setRealData(realData);
         }
      }).start();  
      return futureData;
   }
}

     接下来我们看一下FutureData的实现,他会等待RealData请求完毕,然后发出通知获取真实数据。从而在main方法中的data.getRequest();就可以获取真实的执行结果,这个方法是阻塞线程执行的,因为我们看到在FutureData方法中有wait和notify,只有真正的结果获取到,才能获取到结果。

public class FutureData implements Data{

   private RealData realData ;
   
   private boolean isReady = false;
   public synchronized void setRealData(RealData realData) {
      //如果已经装载完毕了,就直接返回
      if(isReady){
         return;
      }
      //如果没装载,进行装载真实对象
      this.realData = realData;
      isReady = true;
      //进行通知
      notify();
   }
   @Override
   public synchronized String getRequest() {
      //如果没装载好 程序就一直处于阻塞状态
      while(!isReady){
         try {
            wait();
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
      }
      //装载好直接获取数据即可
      return this.realData.getRequest();
   }
}  

    接下来我们看一下RealData的实现,真正获取数据的实现。

public class RealData implements Data{

   private String result ;
   
   public RealData (String queryStr){
      System.out.println("根据" + queryStr + "进行请求真实数据,这是一个很耗时的操作..");
      try {
         Thread.sleep(5000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      System.out.println("操作完毕,获取结果");
      result = "请求真实结果";
   }
   
   @Override
   public String getRequest() {
      return result;
   }

}

执行结果如下:
根据请求参数进行请求真实数据,这是一个很耗时的操作..
请求发送成功!
做其他的事情...
操作完毕,获取结果
获取到真实结果

2  JDK自带实现类--FutureTask

模拟代码:

public class MyFutureTask {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long startTime = System.currentTimeMillis();
        Callable<RealData> myTask = new Callable<RealData>() {

            @Override
            public RealData call() throws Exception {
                return new RealData("获取真实结果");
            }
        };
        FutureTask<RealData> task = new FutureTask<RealData>(myTask);
        new Thread(task).start();
        System.out.println("请求发送成功!");
        System.out.println("做其他的事情...");
        Thread.sleep(2000);  

        if (!task.isDone()) {  // 是否处理完
            System.out.println("请耐心等待结果,");
        }
        RealData realData = task.get();
        System.out.println(realData.getRequest());
    }
}

分析一下上面的代码,

真实数据的获取被封装到Callable的call方法中。

@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

1)大体说下Callable和runnable区别:

(1)Callable接口重写的方法是call(),Runnable接口重写的方法是run()
(2)Callable的任务执行后可返回值,并且call方法可以抛出异常;而Runnable的任务是不能返回值,run方法不能抛出异常。

2)创建一个FutureTask对象,将Callable作为参数传入,然后把这个对象作为Runnable,作为参数,重新起一个线程。

FutureTask继承RunnabelFuture,Ruannable继承Runnable和Future,其中Future接口定义了如下方法:

public class FutureTask<V> implements RunnableFuture<V> 
public interface RunnableFuture<V> extends Runnable, Future<V> 

    下面是Future接口定义的方法


boolean cancel(boolean mayInterruptIfRunning); //取消执行,会抛出异常
boolean isCancelled(); //判断是否取消
boolean isDone(); //判断是否执行完成
V get() throws InterruptedException, ExecutionException;//获取执行结果,会阻塞住线程
V get(long timeout, TimeUnit unit)

    throws InterruptedException, ExecutionException, TimeoutException; //获取执行结果,第一参数是时间,第二个是时间单位,
                                                                //如果在指定时间未获取成功,会抛出TimeoutException异常

3)从上面的方法中我们看到FutureTask同时继承了Runnable方法,所以也有run方法,如下可以发现run方法主要调用了call方法

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

4) 在run方法的开头判断了state,那state是什么?如下

/**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 *
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
api注释大概意思是任务的初始状态是NEW,当获取到结果呈现COMPLETING状态,它是瞬时的,当调用者打断时,它是INTERRUPTING,也是瞬时态。
NORMAL顺利执行完,CANCELED被取消,INTERRUPTTED被打断,EXCEPTIONAL异常。状态转换看上面灰色部分。

5)get方法,主要调用awaitDone方法,主要是等待执行完成,如果出现异常或者超时会终止。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

3 应用

1)需要并行执行,任务不多情况下

2)高并发情况下数创建数据库连接,我们可以通过一个线程创建一个连接,并将这个连接放入Callable的call方法中,

这样其他线程通过调用get方法就可以获取该连接,避免同时创建多个connection,浪费资源