java8的CompletableFuture的使用
一 CompletableFuture的作用
Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行
但是Future是比较消耗CPU的,而且可用的方法也寥寥无几, 比如get()
,如果异步执行的时间比较长,主线程会一直阻塞傻等,直到future获取返回值.
CompletableFuture
是Java8的一个新加的类,它在原来的Future
类上,结合Java8的函数式编程,扩展了一系列强大的功能.
二 如何使用CompletableFuture
0. 准备工作
新建一个类,准备好一个线程池,和几个测试方法
package com.zgd.demo.thread.java8;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.time.Instant;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* @Author: zgd
* @Date: 18/10/29 14:36
* @Description: java8的CompletableFuture使用实例
*/
@Slf4j
public class CompleteFutureTest {
private static ThreadPoolExecutor pool;
private static int core = 10;
private static int max = 10;
private static int queueCapcity = 1024;
private Instant startTime;
private Instant endTime;
static {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
//初始化一个线程池
pool = new ThreadPoolExecutor(core, max, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapcity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
}
/**
* 线程测试方法
*
* @return
*/
public static Integer run(int a, int b,int sec) {
try {
log.info("----------run方法开始执行,休眠{}秒-------\ta:{}\tb:{}",sec, a, b);
Thread.sleep(sec * 1000);
int i = a / b;
log.info("----------run方法结束执行--------\tresult:{}", i);
return i;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 线程测试方法
*
* @return
*/
public static Integer run2(int a) {
//当a为偶数的时候,抛出一个异常
if (a % 2 == 0) {
int i = 1 / 0;
}
int i = a / 2;
return i;
}
/**
* 线程测试方法
*
* @return
*/
public static String run3(String s, int a) {
try {
log.info(Thread.currentThread().getName()+"----------开始休眠{}秒-------", a);
Thread.sleep(a*1000);
log.info(Thread.currentThread().getName()+"----------休眠结束--------");
return s;
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
public void sleep(long sec) throws InterruptedException {
log.info("主线程休眠{}秒....", sec);
TimeUnit.SECONDS.sleep(sec);
log.info("主线程休眠结束....");
}
@Before
public void before() {
log.info("开始测试");
startTime = Instant.now();
}
@After
public void after() {
endTime = Instant.now();
log.info("结束测试,共耗时:\t{}", ChronoUnit.MILLIS.between(startTime, endTime));
}
.....
}
1. runAsync, 创建一个简单的没有返回值的Future
这里使用线程池异步调用前面的run
方法, 传入的第三个参数1,也就是这个异步线程等待1秒,
接着又在主线程中sleep(3)等待3秒,看下运行结果
/**
* 创建一个简单的没有返回值的CompleteFuture
* runAsync 没有返回值
* 耗时 3038
*/
@Test
public void fun01() throws InterruptedException {
CompletableFuture.runAsync(() -> run(10, 5,1), pool);
sleep(3);
}
可以看出主线程和异步线程分别执行了,总耗时取决于等待时间3秒的主线程
这个时候如果吧两者调换,主线程等待1秒,异步run方法等待3秒看看
@Test
public void fun01() throws InterruptedException {
CompletableFuture.runAsync(() -> run(10, 5,3), pool);
sleep(1);
}
可以看出,主线程等待1秒后,就结束方法了,因为是直接用单元测试测的,主线程跑完,其他线程也就关闭了,异步run方法没能执行完毕
这样的话该怎么办呢?如果是用传统的Future future = pool.submit(runable)
, 使用future.get()
,主线程会等到callback执行完毕以后,拿到返回值了再跑完主流程, CompletableFuture如果也要实现这种效果,也是用get()
@Test
public void fun01() throws InterruptedException, ExecutionException {
CompletableFuture<Void> f = CompletableFuture.runAsync(() -> run(10, 5, 3), pool);
sleep(1);
f.get();
}
get方法也就类似于传统的future去阻塞获取异步线程的返回值
2. supplyAsync 创建一个简单的没有返回值的CompleteFuture
上一个方法使用到了get,但是上一个方法是没有返回值的,这个方法使用get来获取返回值
/**
* 创建一个简单的没有返回值的CompleteFuture
* supplyAsync 有返回值,类似Future
* CompletableFuture使用supplyAsync(),会异步调用方法,调用get()方法,会在获取到future返回值后,再执行get()以后的程序
* 耗时 3040
*/
@Test
public void fun02() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> run(10, 5,1), pool);
log.info("----get阻塞开始----");
Integer i = f.get();
log.info("----get阻塞结束----");
log.info("获取的值是:{}",i);
}
3. 监听future返回,thenApply, thenAccept, thenRun
这三个方法很类似
/**
* thenApply() 监听future返回,调用Future方法对返回值进行修改和操作,这个操作有返回值,比如转换类型
* thenAccept() 监听future返回,调用Consumer处理返回值,处理的结果没有返回值,比如打印结果,这样的话最终的CompletableFuture也拿不到返回值
* thenRun() 监听future返回,然后自己自定义处理
* 耗时 7037 ,线程都是同一个demo-pool-0
*/
@Test
public void fun03() throws InterruptedException, ExecutionException {
CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> run(10, 5, 1), pool)
.thenApply(i ->String.valueOf(i).concat("abc"))
.thenAccept(s ->log.info("thenAccept接收的结果是:{}", s))
.thenRun(() ->log.info("在业务后处理其他的流程"))
.thenAccept(s -> log.info("全部结束"));
f.get();
}
首先supplyAsync调用run方法,计算10/5的值,得到Integer,
然后thenApply中将Integer转为String,再拼接一个"abc",
然后thenRun做一些自己的业务处理
idea中可以看到每个方法得到的结果类型, thenApply
是有返回值的,参数是Function
, thenAccept
是没有返回值的,参数是Comsumer
4. thenApplyAsync, thenAcceptAsync, thenRunAsync
上面三个方法还有三个类似的方法,与thenApply,thenAccept,thenRun相比,是另起一个线程执行
/**
* thenApplyAsync() 异步 调用Future方法对返回值进行修改和操作,这个操作有返回值,比如转换类型
* thenAcceptAsync() 异步 调用Consumer处理返回值,处理的结果没有返回值,比如打印结果 最终CompletableFuture也拿不到返回值
* thenRunAsync() 异步
* <p>
* 与thenApply,thenAccept,thenRun相比,是另起一个线程执行,但是因为他们都需要拿到上一个方法的值,所以这里异步开启线程与同步耗时是一样的
* 耗时:7045 线程不一样,都开启了新线程
*/
@Test
public void fun04() throws Exception {
CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> run(10, 5, 1), pool)
.thenApplyAsync(i -> {
log.info("thenApplyAsync休眠3秒");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return String.valueOf(i).concat("abc");
}, pool)
.thenAcceptAsync(s -> {
log.info("thenAcceptAsync接收的结果是:{},休眠2秒", s);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, pool)
.thenRunAsync(() -> {
log.info("thenRunAsync开一个线程,休眠1秒");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, pool)
.thenAccept(s -> log.info("全部结束"));
f.get();
}
根据日志可以看出, 他们分别开启了一个线程去执行, 但是由于他们都需要拿到上一个方法的返回值,所以这里和同步的效果是差不多的
5 处理异常exceptionally
在获取另一个线程的执行结果时,有可能异步处理抛出异常,使用Java8的CompletableFuture就可以很好的解决这个问题
这里异步调用run2方法,这个方法当传入的参数是偶数会跑出除零异常, IntStream.range(1, 11)可以获取1到10的整数, 将其分别调用run2
/**
* 处理异常 ,传入1到10的数调用run2(),当传入的是偶数会抛出异常
* exceptionally 对抛出的异常进行处理
*/
@Test
public void fun05() {
//流式处理,从1到10异步调用run2(),将future都存到list中
List<CompletableFuture<Integer>> futures = IntStream.range(1, 11)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> run2(i), pool).exceptionally(e -> {
log.info("抛出了一个异常:{}", e.getCause().toString());
//返回一个默认值
return 0;
}))
.collect(Collectors.toList());
//获取值
String res = futures.stream()
.map(f -> {
try {
return f.thenApply(String::valueOf).get();
} catch (Exception e) {
e.printStackTrace();
}
return "";
}).collect(Collectors.joining(" , ", "", ""));
System.out.println("res = " + res);
}
如果报错的话就返回0
6 thenCompose 多层结构的future返回一个结果,对应的是java8的flatmap
如果CompletableFuture返回的值,又是一个CompletableFuture,这样层层嵌套的话,获取值只能get().get(), 但是使用thenCompose的话就可以把这些值拉到同一个维度来获取
/**
* 多层结构的future返回一个结果,对应的是java8的flatmap
* thenCompose
*/
@Test
public void fun06() throws ExecutionException, InterruptedException {
//第一个层future,调用了一次方法,直接返回"aaa"
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "aaa", pool);
//future中又异步调用了一次方法(入参s拼接一个"bbb")
CompletableFuture<CompletableFuture<String>> f2 = f1.thenApply(s -> CompletableFuture.supplyAsync(() -> s.concat("bbb"), pool));
//第一种获取值的方法
System.out.println("f2 = " + f2.get().get());
//使用thenCompose
CompletableFuture<String> f3 = f1.thenCompose(s -> CompletableFuture.supplyAsync(() -> s.concat("bbb"), pool));
System.out.println("f3 = " + f3.get());
}
7 henCombine与thenAcceptBoth, 合并两个future
如果我们同时有两个需要异步操作的流程,那么我们就需要使用两个CompletableFuture, 那怎么把这两个CompletableFuture合并,就需要这两个方法
/**
* thenCombine与thenAcceptBoth
* thenCombine 合并两个future,对两个返回值进行处理,有返回值
* thenAcceptBoth 同时接收两个future返回值,合并成一个future,对两个返回值进行处理,没有返回值
* 共耗时: 3043 f1耗时2秒和f3耗时3秒
*/
@Test
public void fun07() throws ExecutionException, InterruptedException {
//休眠2秒的"hello"
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> run3("hello", 2), pool);
//休眠3秒的"zgd"
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> run3("zgd", 3), pool);
//合并两个future
CompletableFuture<String> f3 = f1.thenCombine(f2, (s, s2) -> {
log.info("合并两个返回值:\ts1:{}\ts2:{}", s, s2);
return s.concat(s2);
});
//合并两个future
CompletableFuture<Void> f4 = f1.thenAcceptBoth(f2, (s1, s2) -> {
log.info("s1: {}\t s2: {}",s1,s2);
});
log.info("f3: {}",f3.get());
f4.get();
}
thenAcceptBoth的话没有返回值,也就是他的参数是Comsumer, thenCombine有返回值,你可以把两个值进行合并,两个整数进行加减运算, 而且虽然一个是线程等待2秒, 一个线程等待3秒,但是因为是开启两个线程同时执行,所以拿到合并结果的时候是等待3秒
8 applyToEither与acceptEither 获取最先返回的future
/**
* applyToEither与acceptEither
* applyToEither 取2个future中最先返回的,有返回值
* acceptEither 取2个future中最先返回的,无返回值
* 共耗时: 2035 拿到第一个返回的值后,程序就执行完毕了
*/
@Test
public void fun08() throws ExecutionException, InterruptedException {
//休眠2秒的"hello"
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> run3("hello", 2), pool);
//休眠3秒的"zgd"
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> run3("zgd", 3), pool);
//获取最先返回的,转成大写
CompletableFuture<String> f3 = f1.applyToEither(f2, s -> s.toUpperCase());
System.out.println(LocalTime.now().toString() + "\tf3:最先返回的是\t" + f3.get());
//获取最先返回的,转成大写
CompletableFuture<Void> f4 = f1.acceptEither(f2, s -> log.info("f4:最先返回的是:\t{}", s.toUpperCase()));
f4.get();
}
一个等待2秒.一个等待3秒, 最终耗时2秒, 3秒还没来得及返回就结束了
9 allOf和anyOf 也是获取最快的future
和上一个方法不同,是从一个CompleteableFuture数组里面,获取最快返回的future
- anyOf:
/**
* allOf与anyOf
* allOf 在一个CompletableFuture数组中,等待所有future返回
* anyOf 在一个CompletableFuture数组中,拿到最快的future返回
* 耗时 1053 f1.get()在第一个future返回时,就轮询结束
*/
@Test
public void fun11() throws ExecutionException, InterruptedException {
//生成一个休眠时间从1到10秒的future的list
List<CompletableFuture<String>> futures = IntStream.range(1, 11)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> run3(String.format("sleep:%d", i), i), pool))
.collect(Collectors.toList());
CompletableFuture<Object> f1 = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{}));
f1.thenRun(() -> {
log.info("最快的future已经执行完成");
try {
log.info("f1.get():{}", f1.get());
} catch (Exception e) {
e.printStackTrace();
}
});
log.info("----get轮询开始----");
f1.get();
log.info("----get轮询结束----");
}
1秒中就结束了
但是这里需要注意的是: 如果遍历Futures数组, 把里面的future调用get()方法的话,跟前面说的一样,会被阻塞直到获取返回值, 所以整个流程会等所有的Future都有返回以后才结束
@Test
public void fun11() throws ExecutionException, InterruptedException {
//生成一个休眠时间从1到10秒的future的list
List<CompletableFuture<String>> futures = IntStream.range(1, 11)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> run3(String.format("sleep:%d", i), i), pool))
.collect(Collectors.toList());
CompletableFuture<Object> f1 = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{}));
f1.thenRun(() -> {
log.info("最快的future已经执行完成");
futures.stream().forEach(f -> {
try {
log.info("result:\t{}", f.get());
} catch (Exception e) {
e.printStackTrace();
}
});
// try {
// log.info("f1.get():{}", f1.get());
// } catch (Exception e) {
// e.printStackTrace();
// }
});
log.info("----get轮询开始----");
f1.get();
log.info("----get轮询结束----");
}
时间为10秒
- allOf
等待所有future返回
/**
* allOf与anyOf
* allOf 在一个CompletableFuture数组中,等待所有future返回 没有返回值
* anyOf 在一个CompletableFuture数组中,拿到最快的future返回 有返回值
* 耗时 10047 f1.get()在所有的futures都返回时,才执行完毕
*/
@Test
public void fun10() throws ExecutionException, InterruptedException {
//生成一个休眠时间从1到10秒的future的list
List<CompletableFuture<String>> futures = IntStream.range(1, 11)
.mapToObj(i -> CompletableFuture.supplyAsync(() -> run3(String.format("sleep:%d", i), i), pool))
.collect(Collectors.toList());
CompletableFuture<Void> f1 = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
f1.thenRun(() -> {
log.info("所有的future已经执行完成");
futures.stream().forEach(f -> {
try {
log.info("result:\t{}", f.get());
} catch (Exception e) {
e.printStackTrace();
}
});
});
log.info("----get轮询开始----");
f1.get();
log.info("----get轮询结束----");
}