java8的CompletableFuture的使用

一 CompletableFuture的作用

Future是Java 5添加的类,用来描述一个异步计算的结果。你可以使用isDone方法检查计算是否完成,或者使用get阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel方法停止任务的执行
但是Future是比较消耗CPU的,而且可用的方法也寥寥无几, 比如get(),如果异步执行的时间比较长,主线程会一直阻塞傻等,直到future获取返回值.

CompletableFuture是Java8的一个新加的类,它在原来的Future类上,结合Java8的函数式编程,扩展了一系列强大的功能.

二 如何使用CompletableFuture

0. 准备工作

新建一个类,准备好一个线程池,和几个测试方法

package com.zgd.demo.thread.java8;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.time.Instant;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
 * @Author: zgd
 * @Date: 18/10/29 14:36
 * @Description: java8的CompletableFuture使用实例
 */
@Slf4j
public class CompleteFutureTest {

  private static ThreadPoolExecutor pool;

  private static int core = 10;
  private static int max = 10;
  private static int queueCapcity = 1024;
  private Instant startTime;
  private Instant endTime;

  static {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
    //初始化一个线程池
    pool = new ThreadPoolExecutor(core, max, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueCapcity), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
  }


  /**
   * 线程测试方法
   *
   * @return
   */
  public static Integer run(int a, int b,int sec) {
    try {
      log.info("----------run方法开始执行,休眠{}秒-------\ta:{}\tb:{}",sec, a, b);
      Thread.sleep(sec * 1000);
      int i = a / b;
      log.info("----------run方法结束执行--------\tresult:{}", i);
      return i;
    } catch (Exception e) {
      e.printStackTrace();
      return 0;
    }
  }

  /**
   * 线程测试方法
   *
   * @return
   */
  public static Integer run2(int a) {
    //当a为偶数的时候,抛出一个异常
    if (a % 2 == 0) {
      int i = 1 / 0;
    }
    int i = a / 2;
    return i;
  }


  /**
   * 线程测试方法
   *
   * @return
   */
  public static String run3(String s, int a) {
    try {
      log.info(Thread.currentThread().getName()+"----------开始休眠{}秒-------", a);
      Thread.sleep(a*1000);
      log.info(Thread.currentThread().getName()+"----------休眠结束--------");
      return s;
    } catch (Exception e) {
      e.printStackTrace();
      return "error";
    }
  }



  public void sleep(long sec) throws InterruptedException {
    log.info("主线程休眠{}秒....", sec);
    TimeUnit.SECONDS.sleep(sec);
    log.info("主线程休眠结束....");
  }


  @Before
  public void before() {
    log.info("开始测试");
    startTime = Instant.now();
  }

  @After
  public void after() {
    endTime = Instant.now();
    log.info("结束测试,共耗时:\t{}", ChronoUnit.MILLIS.between(startTime, endTime));

  }

.....
}

1. runAsync, 创建一个简单的没有返回值的Future

这里使用线程池异步调用前面的run方法, 传入的第三个参数1,也就是这个异步线程等待1秒,
接着又在主线程中sleep(3)等待3秒,看下运行结果

/**
   * 创建一个简单的没有返回值的CompleteFuture
   * runAsync 没有返回值
   * 耗时 3038
   */
  @Test
  public void fun01() throws InterruptedException {
    CompletableFuture.runAsync(() -> run(10, 5,1), pool);
    sleep(3);
  }

可以看出主线程和异步线程分别执行了,总耗时取决于等待时间3秒的主线程
java8的CompletableFuture的使用

这个时候如果吧两者调换,主线程等待1秒,异步run方法等待3秒看看

 @Test
  public void fun01() throws InterruptedException {
    CompletableFuture.runAsync(() -> run(10, 5,3), pool);
    sleep(1);
  }

可以看出,主线程等待1秒后,就结束方法了,因为是直接用单元测试测的,主线程跑完,其他线程也就关闭了,异步run方法没能执行完毕
java8的CompletableFuture的使用

这样的话该怎么办呢?如果是用传统的Future future = pool.submit(runable), 使用future.get(),主线程会等到callback执行完毕以后,拿到返回值了再跑完主流程, CompletableFuture如果也要实现这种效果,也是用get()

  @Test
  public void fun01() throws InterruptedException, ExecutionException {
    CompletableFuture<Void> f = CompletableFuture.runAsync(() -> run(10, 5, 3), pool);
    sleep(1);
    f.get();
  }

get方法也就类似于传统的future去阻塞获取异步线程的返回值
java8的CompletableFuture的使用

2. supplyAsync 创建一个简单的没有返回值的CompleteFuture

上一个方法使用到了get,但是上一个方法是没有返回值的,这个方法使用get来获取返回值

/**
   * 创建一个简单的没有返回值的CompleteFuture
   * supplyAsync 有返回值,类似Future
   * CompletableFuture使用supplyAsync(),会异步调用方法,调用get()方法,会在获取到future返回值后,再执行get()以后的程序
   * 耗时 3040
   */
  @Test
  public void fun02() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> run(10, 5,1), pool);

    log.info("----get阻塞开始----");
    Integer i = f.get();
    log.info("----get阻塞结束----");

   log.info("获取的值是:{}",i);
  }

java8的CompletableFuture的使用

3. 监听future返回,thenApply, thenAccept, thenRun

这三个方法很类似

 /**
   * thenApply()  监听future返回,调用Future方法对返回值进行修改和操作,这个操作有返回值,比如转换类型
   * thenAccept() 监听future返回,调用Consumer处理返回值,处理的结果没有返回值,比如打印结果,这样的话最终的CompletableFuture也拿不到返回值
   * thenRun()    监听future返回,然后自己自定义处理
   * 耗时 7037 ,线程都是同一个demo-pool-0
   */
  @Test
  public void fun03() throws InterruptedException, ExecutionException {
    CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> run(10, 5, 1), pool)
            .thenApply(i ->String.valueOf(i).concat("abc"))
            .thenAccept(s ->log.info("thenAccept接收的结果是:{}", s))
            .thenRun(() ->log.info("在业务后处理其他的流程"))
            .thenAccept(s -> log.info("全部结束"));
    f.get();
  }

首先supplyAsync调用run方法,计算10/5的值,得到Integer,
然后thenApply中将Integer转为String,再拼接一个"abc",
然后thenRun做一些自己的业务处理

idea中可以看到每个方法得到的结果类型, thenApply是有返回值的,参数是Function , thenAccept是没有返回值的,参数是Comsumer
java8的CompletableFuture的使用

java8的CompletableFuture的使用

4. thenApplyAsync, thenAcceptAsync, thenRunAsync

上面三个方法还有三个类似的方法,与thenApply,thenAccept,thenRun相比,是另起一个线程执行

/**
   * thenApplyAsync()  异步 调用Future方法对返回值进行修改和操作,这个操作有返回值,比如转换类型
   * thenAcceptAsync() 异步 调用Consumer处理返回值,处理的结果没有返回值,比如打印结果  最终CompletableFuture也拿不到返回值
   * thenRunAsync()    异步
   * <p>
   * 与thenApply,thenAccept,thenRun相比,是另起一个线程执行,但是因为他们都需要拿到上一个方法的值,所以这里异步开启线程与同步耗时是一样的
   * 耗时:7045  线程不一样,都开启了新线程
   */
  @Test
  public void fun04() throws Exception {
    CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> run(10, 5, 1), pool)
            .thenApplyAsync(i -> {
              log.info("thenApplyAsync休眠3秒");
              try {
                TimeUnit.SECONDS.sleep(3);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
              return String.valueOf(i).concat("abc");
            }, pool)
            .thenAcceptAsync(s -> {
              log.info("thenAcceptAsync接收的结果是:{},休眠2秒", s);
              try {
                TimeUnit.SECONDS.sleep(2);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }, pool)
            .thenRunAsync(() -> {
              log.info("thenRunAsync开一个线程,休眠1秒");
              try {
                TimeUnit.SECONDS.sleep(1);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
            }, pool)
            .thenAccept(s -> log.info("全部结束"));

    f.get();

  }

根据日志可以看出, 他们分别开启了一个线程去执行, 但是由于他们都需要拿到上一个方法的返回值,所以这里和同步的效果是差不多的
java8的CompletableFuture的使用

5 处理异常exceptionally

在获取另一个线程的执行结果时,有可能异步处理抛出异常,使用Java8的CompletableFuture就可以很好的解决这个问题

这里异步调用run2方法,这个方法当传入的参数是偶数会跑出除零异常, IntStream.range(1, 11)可以获取1到10的整数, 将其分别调用run2

/**
   * 处理异常 ,传入1到10的数调用run2(),当传入的是偶数会抛出异常
   * exceptionally 对抛出的异常进行处理
   */
  @Test
  public void fun05() {
    //流式处理,从1到10异步调用run2(),将future都存到list中
    List<CompletableFuture<Integer>> futures = IntStream.range(1, 11)
            .mapToObj(i -> CompletableFuture.supplyAsync(() -> run2(i), pool).exceptionally(e -> {
               log.info("抛出了一个异常:{}", e.getCause().toString());
              //返回一个默认值
              return 0;
            }))
            .collect(Collectors.toList());
    //获取值
    String res = futures.stream()
            .map(f -> {
              try {
                return f.thenApply(String::valueOf).get();
              } catch (Exception e) {
                e.printStackTrace();
              }
              return "";
            }).collect(Collectors.joining(" , ", "", ""));
    System.out.println("res = " + res);
  }

如果报错的话就返回0
java8的CompletableFuture的使用

6 thenCompose 多层结构的future返回一个结果,对应的是java8的flatmap

如果CompletableFuture返回的值,又是一个CompletableFuture,这样层层嵌套的话,获取值只能get().get(), 但是使用thenCompose的话就可以把这些值拉到同一个维度来获取

 /**
   * 多层结构的future返回一个结果,对应的是java8的flatmap
   * thenCompose
   */
  @Test
  public void fun06() throws ExecutionException, InterruptedException {
    //第一个层future,调用了一次方法,直接返回"aaa"
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "aaa", pool);

    //future中又异步调用了一次方法(入参s拼接一个"bbb")
    CompletableFuture<CompletableFuture<String>> f2 = f1.thenApply(s -> CompletableFuture.supplyAsync(() -> s.concat("bbb"), pool));
    //第一种获取值的方法
    System.out.println("f2 = " + f2.get().get());
    //使用thenCompose
    CompletableFuture<String> f3 = f1.thenCompose(s -> CompletableFuture.supplyAsync(() -> s.concat("bbb"), pool));
    System.out.println("f3 = " + f3.get());

  }

java8的CompletableFuture的使用

7 henCombine与thenAcceptBoth, 合并两个future

如果我们同时有两个需要异步操作的流程,那么我们就需要使用两个CompletableFuture, 那怎么把这两个CompletableFuture合并,就需要这两个方法

 /**
   * thenCombine与thenAcceptBoth
   * thenCombine    合并两个future,对两个返回值进行处理,有返回值
   * thenAcceptBoth 同时接收两个future返回值,合并成一个future,对两个返回值进行处理,没有返回值
   * 共耗时:	3043  f1耗时2秒和f3耗时3秒
   */
  @Test
public void fun07() throws ExecutionException, InterruptedException {
    //休眠2秒的"hello"
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> run3("hello", 2), pool);

    //休眠3秒的"zgd"
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> run3("zgd", 3), pool);

    //合并两个future
    CompletableFuture<String> f3 = f1.thenCombine(f2, (s, s2) -> {
      log.info("合并两个返回值:\ts1:{}\ts2:{}", s, s2);
      return s.concat(s2);
    });

    //合并两个future
    CompletableFuture<Void> f4 = f1.thenAcceptBoth(f2, (s1, s2) -> {
      log.info("s1: {}\t s2: {}",s1,s2);
    });

    log.info("f3: {}",f3.get());
    f4.get();
  }

thenAcceptBoth的话没有返回值,也就是他的参数是Comsumer, thenCombine有返回值,你可以把两个值进行合并,两个整数进行加减运算, 而且虽然一个是线程等待2秒, 一个线程等待3秒,但是因为是开启两个线程同时执行,所以拿到合并结果的时候是等待3秒
java8的CompletableFuture的使用

8 applyToEither与acceptEither 获取最先返回的future

 /**
   * applyToEither与acceptEither
   * applyToEither    取2个future中最先返回的,有返回值
   * acceptEither     取2个future中最先返回的,无返回值
   * 共耗时:	2035    拿到第一个返回的值后,程序就执行完毕了
   */
  @Test
  public void fun08() throws ExecutionException, InterruptedException {
    //休眠2秒的"hello"
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> run3("hello", 2), pool);

    //休眠3秒的"zgd"
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> run3("zgd", 3), pool);

    //获取最先返回的,转成大写
    CompletableFuture<String> f3 = f1.applyToEither(f2, s -> s.toUpperCase());
    System.out.println(LocalTime.now().toString() + "\tf3:最先返回的是\t" + f3.get());

    //获取最先返回的,转成大写
    CompletableFuture<Void> f4 = f1.acceptEither(f2, s -> log.info("f4:最先返回的是:\t{}", s.toUpperCase()));
    f4.get();
  }

一个等待2秒.一个等待3秒, 最终耗时2秒, 3秒还没来得及返回就结束了
java8的CompletableFuture的使用

9 allOf和anyOf 也是获取最快的future

和上一个方法不同,是从一个CompleteableFuture数组里面,获取最快返回的future

  • anyOf:
/**
   * allOf与anyOf
   * allOf    在一个CompletableFuture数组中,等待所有future返回
   * anyOf    在一个CompletableFuture数组中,拿到最快的future返回
   * 耗时 1053  f1.get()在第一个future返回时,就轮询结束
   */
  @Test
  public void fun11() throws ExecutionException, InterruptedException {

    //生成一个休眠时间从1到10秒的future的list
    List<CompletableFuture<String>> futures = IntStream.range(1, 11)
            .mapToObj(i -> CompletableFuture.supplyAsync(() -> run3(String.format("sleep:%d", i), i), pool))
            .collect(Collectors.toList());

    CompletableFuture<Object> f1 = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{}));

    f1.thenRun(() -> {
      log.info("最快的future已经执行完成");

      try {
        log.info("f1.get():{}", f1.get());
      } catch (Exception e) {
        e.printStackTrace();
      }

    });

    log.info("----get轮询开始----");
    f1.get();
    log.info("----get轮询结束----");

  }

1秒中就结束了
java8的CompletableFuture的使用
但是这里需要注意的是: 如果遍历Futures数组, 把里面的future调用get()方法的话,跟前面说的一样,会被阻塞直到获取返回值, 所以整个流程会等所有的Future都有返回以后才结束

 @Test
  public void fun11() throws ExecutionException, InterruptedException {

    //生成一个休眠时间从1到10秒的future的list
    List<CompletableFuture<String>> futures = IntStream.range(1, 11)
            .mapToObj(i -> CompletableFuture.supplyAsync(() -> run3(String.format("sleep:%d", i), i), pool))
            .collect(Collectors.toList());

    CompletableFuture<Object> f1 = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{}));

    f1.thenRun(() -> {
      log.info("最快的future已经执行完成");
      futures.stream().forEach(f -> {
        try {
          log.info("result:\t{}", f.get());
        } catch (Exception e) {
          e.printStackTrace();
        }
      });
//      try {
//        log.info("f1.get():{}", f1.get());
//      } catch (Exception e) {
//        e.printStackTrace();
//      }

    });

    log.info("----get轮询开始----");
    f1.get();
    log.info("----get轮询结束----");

  }

时间为10秒

java8的CompletableFuture的使用

  • allOf
    等待所有future返回
/**
   * allOf与anyOf
   * allOf    在一个CompletableFuture数组中,等待所有future返回      没有返回值
   * anyOf    在一个CompletableFuture数组中,拿到最快的future返回   有返回值
   * 耗时 10047  f1.get()在所有的futures都返回时,才执行完毕
   */
  @Test
  public void fun10() throws ExecutionException, InterruptedException {

    //生成一个休眠时间从1到10秒的future的list
    List<CompletableFuture<String>> futures = IntStream.range(1, 11)
            .mapToObj(i -> CompletableFuture.supplyAsync(() -> run3(String.format("sleep:%d", i), i), pool))
            .collect(Collectors.toList());

    CompletableFuture<Void> f1 = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));

    f1.thenRun(() -> {
      log.info("所有的future已经执行完成");
      futures.stream().forEach(f -> {
        try {
          log.info("result:\t{}", f.get());
        } catch (Exception e) {
          e.printStackTrace();
        }
      });
    });


    log.info("----get轮询开始----");
    f1.get();
    log.info("----get轮询结束----");

  }

java8的CompletableFuture的使用