Java 并行程序设计模式 (Future 模式)

并行设计模式属于设计优化的一部分,它是对一些常用的多线程结构的总结和抽象。与串行程序相比,并行程序的结构通常更为复杂。因此,合理的使用并行模式在多线程开发中,更具有积极意义。并行程序设计模式主要有 Future模式 、Master-Worker模式、Guarded Suspension模式、不变模式和生产者-消费者模式,本文主要讲解  Future模式

Future模式的核心在于去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他的业务逻辑,从而充分利用计算机资源。


Future模式的代码实现:


(1)Data的实现
        Data是一个接口,提供了getResult()方法。无论FutureData或者RealData都实现了这个接口
1
2
3
public interface Data {
    public String getResult();
}
(2)RealData
        RealData是最终的要返回的数据,比较慢,用sleep模拟
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.TimeUnit;
 
public class RealData implements Data {
     
    protected final String result;
     
    public RealData(String para){
        //realData的构造可能很慢,需要用户等待很久,这里使用sleep模拟
        StringBuffer sBuffer = new StringBuffer();
        for (int i = 0; i < 5; i++) {
            sBuffer.append(para).append("|");
            //sleep 3秒模拟耗时
            try {
                TimeUnit.SECONDS.sleep(1);
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("真实数据生成完毕");
        result = sBuffer.toString();
    }
 
    public String getResult() {
        return result;
    }
 
}
(3)FutureData 
        FutureData 实现了一个快速返回的RealData包装。它只是一个包装,可以很快被构造并返回。当使用FutureData的getResult()方法时,程序会阻塞,等到RealData被注入到程序中,才使用RealData的getResult()方法返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.tlk.chapter4.future2;
 
public class FutureData implements Data {
    protected RealData realData = null//FutureData是RealData的包装
    protected boolean isReady = false;
     
     
    public synchronized  void setRealData(RealData realData){
        if (isReady) {
            return;
        }
        this.realData = realData;
        isReady = true;
        notifyAll();                //RealData已经被注入,通过getResult()
    }
     
    public synchronized String getResult() {
        while(!isReady){
            try {
                wait();     //一直等待,直到RealData被注入
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
         
        return realData.result; //RealData实现的返回的结果
    }
 
}
(4) Client的实现
Client主要实现了获取FutureData,开启构造RealData的线程,并在接受请求后,很快的返回FutureData
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.tlk.chapter4.future2;
 
/**
 * Client 主要实现了获取FutureData,开启构造RealData的线程,并在接受请求后,很快的返回FutureData
 * @author tanlk
 * @date 2017年7月20日下午5:11:20
 */
public class Client {
 
    public Data request(final String queryStr){
        final FutureData future = new FutureData();
         
        new Thread(){
            public void run() {
                RealData realData = new RealData(queryStr);
                future.setRealData(realData);
            };
        }.start();
         
        return future;
    }
}
(5) Main函数的实现 
        主要负责调用Client发起请求,并使用返回的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.tlk.chapter4.future2;
 
import java.util.concurrent.TimeUnit;
 
public class Main {
     
    public static void main(String[] args) {
        Client client = new Client();
         
        Data data = client.request("hello");
         
        System.out.println("请求完毕");
        try {
            //处理其他耗时任务
            TimeUnit.SECONDS.sleep(2);
        catch (InterruptedException e) {
            e.printStackTrace();
        }
         
        System.out.println(data.getResult());
    }
}
上诉代码用到的最主要的就是线程的等待和唤醒,wait()和notifyAll()

JDK  内置Future模式的实现

            核心结构示意图

Java 并行程序设计模式 (Future 模式)

其中,最重要的模块是FutureTask类,它实现了Runnable接口,作为单独的线程运行。在其run()方法中,通过Sync内部类,调用Callable接口,并维护Callable接口返回的对象。当使用FutureTask.get()方法时,将返回Callable接口的返回对象。


1. 通过实现Callable接口的call()方法,指定FutureTask的实际工作内容和返回对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
 
public class TrueData implements Callable<String> {
    private String para;
     
    public TrueData(){
         
    }
     
    public TrueData(String para){
        this.para = para;
    }
     
    public String call() throws Exception {
        //trueData的构造可能很慢,需要用户等待很久,这里使用sleep模拟
        StringBuffer sBuffer = new StringBuffer();
        for (int i = 0; i < 5; i++) {
            sBuffer.append(para).append("|");
            //sleep 1秒模拟耗时
            try {
                TimeUnit.SECONDS.sleep(1);
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("真实数据生成完毕");
        return sBuffer.toString();
    }
 
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
 
public class FutureTaskTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
         
        FutureTask<String> future = new FutureTask<String>(new TrueData("hi"));
         
        ExecutorService executor = Executors.newFixedThreadPool(1);
        //执行FutureTask,相当于上例的client。request()发送请求
        //在这里开启线程进行TrueData的call()执行
        executor.submit(future);
        System.out.println("请求完毕");
        try {
            //执行其他耗时任务
            TimeUnit.SECONDS.sleep(2);
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        //发现最终执行结果是真实数据生成完毕,即异步任务并没有被cancel而是一直执行完毕,这样看来这个cancel方法有点名不副实啊
        //我们如果查看FutureTask的源码就会发现cancel只不过是调用了Thread的interrupt方法,而interrupt只能是停掉线程中有sleep,wait,join
        //逻辑的线程,抛出一个InterruptException。这样看来FutureTask的cancel方法并不能停掉一切正在执行的异步任务
        //future.cancel(true);
        //System.out.println("超时获取数据:" + future.get(1, TimeUnit.SECONDS)); //会抛出java.util.concurrent.TimeoutException
        System.out.println("数据:" + future.get());
    }
}
注意:查看FutureTask的源码就会发现cancel只不过是调用了Thread的interrupt方法,而interrupt只能是停掉线程中有sleep,wait,join逻辑的线程,抛出一个InterruptException。这样看来FutureTask的cancel方法并不能停掉一切正在执行的异步任务