CompletableFuture学习
/**
* CompletableFuture源码中有四个静态方法用来执行异步任务:
* 四个静态方法分别如下:
*
* public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..} ---有返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
*
* public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..} ---有返回值,可以指定线程池
*
* public static CompletableFuture<Void> runAsync(Runnable runnable){..} --- 无返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
*
* public static CompletableFuture<Void> runAsync(Runnable runnable, --- 无返回值,可以指定线程池
* Executor executor){..}
*/
/**
* 获取执行返回值的几个方法
*
*V get(); --- 如果返回值没有返回,一直阻塞,阻塞当前线程的执行
* V get(long timeout,Timeout unit); --- 设置阻塞时间,在超时时间内阻塞当前线程的执行
* T getNow(T defaultValue); --- 有返回值就返回,线程抛出异常就返回设置的默认值
* T join(); --- 返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别
*
*
* 上面两个方法是Future中的实现方式,get()会堵塞当前的线程,这就造成了一个问题,如果执行线程迟迟没有返回数据,get()会一直等待下去,因此,第二个get()方法可以设置等待的时间.
*
* getNow()方法比较有意思,表示当有了返回结果时会返回结果,如果异步线程抛了异常会返回自己设置的默认值.
*/
CompletableFuture类实现了Future和CompletionStage接口,
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {....}
关于Future接口,官方api介绍:
public interface Future<V>
Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。
用法示例(注意,下列各类都是构造好的。)
interface ArchiveSearcher { String search(String target); }
class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(final String target)
throws InterruptedException {
Future<String> future
= executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) { cleanup(); return; }
}
}
FutureTask
类是 Future 的一个实现,Future 可实现 Runnable,所以可通过 Executor 来执行。例如,可用下列内容替换上面带有 submit 的构造:
FutureTask<String> future =
new FutureTask<String>(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
executor.execute(future);
他有四个方法:
boolean |
cancel(boolean mayInterruptIfRunning) 试图取消对此任务的执行。 |
V |
get() 如有必要,等待计算完成,然后获取其结果。 |
V |
get(long timeout, TimeUnit unit) 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。 |
boolean |
isCancelled() 如果在任务正常完成前将其取消,则返回 true。 |
boolean |
isDone() 如果任务已完成,则返回 true。 |
但是在CompletableFuture类中不推荐使用实现Future接口的5个方法,原因是它自个封装了更多的方法来使用.
下面开始为测试做准备:
创建一个线程池类:
BusinessExecutor.java
package com.hf.completablefuture.executor;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description: 线程池
* @Date: 2019/5/7
* @Auther:
*/
@Component //单例模式,存入spring容器中
public class BusinessExecutor {
private ThreadPoolExecutor poolExecutor;
BlockingQueue blockingQueue = null;
/**
* 核心线程大小
*/
private static Integer corePoolSize = 5;
/**
* 最大线程数
*/
private static Integer maxPoolSize = 10;
/**
* 活跃时间,为指定单位
*/
private static Integer keepAlive = 10;
/**
* 初始化线程池,在构造函数完成后自动执行该方法
*/
@PostConstruct
private void initPool(){
//初始化队列
blockingQueue = new LinkedBlockingQueue<>(5);
//初始化线程池
poolExecutor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAlive, TimeUnit.SECONDS,blockingQueue);
}
/**
* 提供给外界获取线程池
* @return
*/
public ThreadPoolExecutor getPoolExecutor(){
return this.poolExecutor;
}
}
上面使用了@PostConstruct注解,该注解的作用是初始化方法中的代码,和static静态代码块的区别是,静态代码块是在类初始化的时候最先初始化的,接着在初始化构造器,最后初始化打上了@PostConstruct注解的代码,也就是依次顺序是:
static代码块 > 构造函数 > @PostConstruct注解的代码
创建测试用例;
package com.hf.completablefuture.completablefuture;
import com.hf.completablefuture.executor.BusinessExecutor;
import lombok.extern.slf4j.Slf4j;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
* @Description: 测试CompletableFuture
* @Date: 2019/5/7
* @Auther:
*/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class CompletableFutureTest {
@Autowired
private BusinessExecutor businessExecutor;
private Executor poolExecutor;
@Before
public void getThreadPoolExecutor(){
poolExecutor = businessExecutor.getPoolExecutor();
}
/**
* CompletableFuture源码中有四个静态方法用来执行异步任务:
* 四个静态方法分别如下:
*
* public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..} ---有返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
*
* public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..} ---有返回值,可以指定线程池
*
* public static CompletableFuture<Void> runAsync(Runnable runnable){..} --- 无返回值,默认使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码
*
* public static CompletableFuture<Void> runAsync(Runnable runnable, --- 无返回值,可以指定线程池
* Executor executor){..}
*/
/**
* 获取执行返回值的几个方法
*
*V get(); --- 如果返回值没有返回,一直阻塞
* V get(long timeout,Timeout unit); --- 设置阻塞时间
* T getNow(T defaultValue); --- 有返回值就返回,线程抛出异常就返回设置的默认值
* T join(); --- 返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别
*
*
* 上面两个方法是Future中的实现方式,get()会堵塞当前的线程,这就造成了一个问题,如果执行线程迟迟没有返回数据,get()会一直等待下去,因此,第二个get()方法可以设置等待的时间.
*
* getNow()方法比较有意思,表示当有了返回结果时会返回结果,如果异步线程抛了异常会返回自己设置的默认值.
*/
@Test
public void test01(){
try {
//异步执行
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 100, poolExecutor);
//获取返回值
System.out.println("future1返回值:" + future1.get()); //输出 100
CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(() -> {
//睡眠5秒
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 100;
}, poolExecutor);
//注意调用get方法是会阻塞当前线程,直到有返回值才会放行的
//System.out.println("future02返回值" + future02.get()); //输出100,会一直等待future02结果返回,或者直到出现了异常
//查看异步线程是否执行完成
System.out.println(future02.isDone()); //输出false
System.out.println("future022返回值" + future02.get(1, TimeUnit.SECONDS)); //如果注释掉上面的get方法,那么会报异常TimeoutException,原因是在异步线程中sleep了5秒,但是这行代码设置超时1秒返回,直接过1秒就过去了阻塞,让主线程执行
} catch (Exception e) {
log.error("出现异常{}:" + e.getMessage(),e);
}
}
@Test
public void test02(){
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return 100;
}, poolExecutor);
//future.join();
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
log.error("出现异常{}:" + e.getMessage(),e);
}
}
}
在test02中区别了get和join方法的区别:
join
返回计算的结果或者抛出一个unchecked异常(CompletionException)
如下图:
使用join方法异步调用异常的代码
使用get方法异步调用异常的代码:
看test01的测试:
thenAccept(): 功能:当前任务正常完成以后执行,当前任务的执行结果可以作为下一任务的输入参数,无返回值.
/**
* thenAccept
* 功能:当前任务正常完成以后执行,当前任务的执行结果可以作为下一任务的输入参数,无返回值.
* 执行任务A,用A的返回值作为异步任务B的入参执行,无返回值
*
*/
@Test
public void test03(){
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "任务A";
},poolExecutor);
CompletableFuture<Void> futureB = futureA.thenAccept(a -> {
System.out.println("执行任务B");
System.out.println("入参应该是'任务A':" + a); //输出任务A,如果在异步任务中时间太长,会直接主程序就接收了
});
try {
futureB.get(); //使用get阻塞当前主线程的执行,直到有结果
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
thenRun():对不关心上一步的计算结果,执行下一个操作
/**
* thenRun()
* 对不关心上一步的计算结果,执行下一个操作
*/
@Test
public void test04(){
CompletableFuture<Object> futureA = CompletableFuture.supplyAsync(() -> "任务A");
CompletableFuture<Void> futureB = futureA.thenRun(() -> System.out.println("任务B执行")); //输出任务B执行
}
thenApply():
当前任务正常完成以后执行,当前任务的执行的结果会作为下一任务的输入参数,有返回值
/**
* thenApply():当前任务正常完成以后执行,当前任务的执行的结果会作为下一任务的输入参数,有返回值
*
* A任务执行,B任务依赖A任务的结果,作为入参执行,C任务依赖B任务的结果执行,最终返回C任务的结果
*/
@Test
public void test05(){
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "hi");
CompletableFuture<String> futureB = futureA.thenApply((a) -> a + ",hello"); //参数a是 hi
CompletableFuture<String> futureC = futureB.thenApply((b) -> b + ",world!"); //参数b是 hi,hello
try {
System.out.println(futureC.get()); //输出 hi,hello,world
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
上面的代码,我们当然可以先调用future.join()先得到任务A的返回值,然后再拿返回值做入参去执行任务B,而thenApply的存在就在于帮我简化了这一步,我们不必因为等待一个计算完成而一直阻塞着调用线程,而是告诉CompletableFuture你啥时候执行完就啥时候进行下一步. 就把多个任务串联起来了.
thenCombine(..) thenAcceptBoth(..) runAfterBoth(..)
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
功能:结合两个CompletionStage的结果,进行转化后返回
/**
* thenCombine():是结合两个任务的返回值进行转化后再返回
* A执行完,B执行完,将两者结合起来,做逻辑处理
*/
@Test
public void test06(){
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> 500);
CompletableFuture<Double> futureB = CompletableFuture.supplyAsync(() -> 0.8);
CompletableFuture<Double> futureC = futureA.thenCombine(futureB, (a,b) -> {
System.out.println("a参数是:" + a); //500
System.out.println("b参数是:" + b); //0.8
return a * b;
});
try {
System.out.println(futureC.get(2,TimeUnit.SECONDS)); //输出400.0
} catch (Exception e) {
e.printStackTrace();
}
}
如果需要上两个任务的返回值,就用thenAcceptBoth(),但是无返回值;如果不需要上两个任务的返回值,并且本身也没有返回值就用runAfterBoth(),如下:
@Test
public void test06(){
CompletableFuture<Integer> futureA = CompletableFuture.supplyAsync(() -> 500);
CompletableFuture<Double> futureB = CompletableFuture.supplyAsync(() -> 0.8);
CompletableFuture<Double> futureC = futureA.thenCombine(futureB, (a,b) -> {
System.out.println("a参数是:" + a); //500
System.out.println("b参数是:" + b); //0.8
return a * b;
});
CompletableFuture<Void> futureD = futureA.thenAcceptBoth(futureB, (a, b) -> System.out.println("执行任务D"));
CompletableFuture<Void> futureE = futureA.runAfterBoth(futureB, () -> System.out.println("执行任务E"));
try {
System.out.println(futureC.get(2,TimeUnit.SECONDS)); //输出400.0
System.out.println(futureD.get()); //输出null,因为没有返回值
System.out.println(futureE.get()); //输出null,因为没有返回值
} catch (Exception e) {
e.printStackTrace();
}
}
未完待续....
最后,感谢引用博主的分享:
https://blog.****.net/qq_42606051/article/details/84028376
https://colobu.com/2016/02/29/Java-CompletableFuture/