Java 并行程序设计模式 (Future 模式)
并行设计模式属于设计优化的一部分,它是对一些常用的多线程结构的总结和抽象。与串行程序相比,并行程序的结构通常更为复杂。因此,合理的使用并行模式在多线程开发中,更具有积极意义。并行程序设计模式主要有 Future模式 、Master-Worker模式、Guarded
Suspension模式、不变模式和生产者-消费者模式,本文主要讲解 Future模式
Future模式的核心在于去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他的业务逻辑,从而充分利用计算机资源。
Future模式的代码实现:
(1)Data的实现
Data是一个接口,提供了getResult()方法。无论FutureData或者RealData都实现了这个接口
1
2
3
|
public interface Data { public String getResult();
} |
(2)RealData
RealData是最终的要返回的数据,比较慢,用sleep模拟
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
import java.util.concurrent.TimeUnit; public class RealData implements Data { protected final String result;
public RealData(String para){
//realData的构造可能很慢,需要用户等待很久,这里使用sleep模拟
StringBuffer sBuffer = new StringBuffer();
for (int i = 0; i < 5; i++) {
sBuffer.append(para).append( "|" );
//sleep 3秒模拟耗时
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println( "真实数据生成完毕" );
result = sBuffer.toString();
}
public String getResult() {
return result;
}
} |
(3)FutureData
FutureData 实现了一个快速返回的RealData包装。它只是一个包装,可以很快被构造并返回。当使用FutureData的getResult()方法时,程序会阻塞,等到RealData被注入到程序中,才使用RealData的getResult()方法返回
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package com.tlk.chapter4.future2; public class FutureData implements Data { protected RealData realData = null ; //FutureData是RealData的包装
protected boolean isReady = false ;
public synchronized void setRealData(RealData realData){
if (isReady) {
return ;
}
this .realData = realData;
isReady = true ;
notifyAll(); //RealData已经被注入,通过getResult()
}
public synchronized String getResult() {
while (!isReady){
try {
wait(); //一直等待,直到RealData被注入
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return realData.result; //RealData实现的返回的结果
}
} |
(4) Client的实现
Client主要实现了获取FutureData,开启构造RealData的线程,并在接受请求后,很快的返回FutureData
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package com.tlk.chapter4.future2; /** * Client 主要实现了获取FutureData,开启构造RealData的线程,并在接受请求后,很快的返回FutureData
* @author tanlk
* @date 2017年7月20日下午5:11:20
*/
public class Client { public Data request(final String queryStr){
final FutureData future = new FutureData();
new Thread(){
public void run() {
RealData realData = new RealData(queryStr);
future.setRealData(realData);
};
}.start();
return future;
}
} |
(5) Main函数的实现
主要负责调用Client发起请求,并使用返回的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package com.tlk.chapter4.future2; import java.util.concurrent.TimeUnit; public class Main { public static void main(String[] args) {
Client client = new Client();
Data data = client.request( "hello" );
System.out.println( "请求完毕" );
try {
//处理其他耗时任务
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(data.getResult());
}
} |
上诉代码用到的最主要的就是线程的等待和唤醒,wait()和notifyAll()
JDK 内置Future模式的实现
核心结构示意图
其中,最重要的模块是FutureTask类,它实现了Runnable接口,作为单独的线程运行。在其run()方法中,通过Sync内部类,调用Callable接口,并维护Callable接口返回的对象。当使用FutureTask.get()方法时,将返回Callable接口的返回对象。
1. 通过实现Callable接口的call()方法,指定FutureTask的实际工作内容和返回对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; public class TrueData implements Callable<String> { private String para;
public TrueData(){
}
public TrueData(String para){
this .para = para;
}
public String call() throws Exception {
//trueData的构造可能很慢,需要用户等待很久,这里使用sleep模拟
StringBuffer sBuffer = new StringBuffer();
for (int i = 0; i < 5; i++) {
sBuffer.append(para).append( "|" );
//sleep 1秒模拟耗时
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println( "真实数据生成完毕" );
return sBuffer.toString();
}
} |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureTaskTest { public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
FutureTask<String> future = new FutureTask<String>( new TrueData( "hi" ));
ExecutorService executor = Executors.newFixedThreadPool(1);
//执行FutureTask,相当于上例的client。request()发送请求
//在这里开启线程进行TrueData的call()执行
executor.submit(future);
System.out.println( "请求完毕" );
try {
//执行其他耗时任务
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//发现最终执行结果是真实数据生成完毕,即异步任务并没有被cancel而是一直执行完毕,这样看来这个cancel方法有点名不副实啊
//我们如果查看FutureTask的源码就会发现cancel只不过是调用了Thread的interrupt方法,而interrupt只能是停掉线程中有sleep,wait,join
//逻辑的线程,抛出一个InterruptException。这样看来FutureTask的cancel方法并不能停掉一切正在执行的异步任务
//future.cancel(true);
//System.out.println("超时获取数据:" + future.get(1, TimeUnit.SECONDS)); //会抛出java.util.concurrent.TimeoutException
System.out.println( "数据:" + future.get());
}
} |
注意:查看FutureTask的源码就会发现cancel只不过是调用了Thread的interrupt方法,而interrupt只能是停掉线程中有sleep,wait,join逻辑的线程,抛出一个InterruptException。这样看来FutureTask的cancel方法并不能停掉一切正在执行的异步任务