fork/join 分支/合并框架和自动机制拆分流Spliterator
利用fork/join求和代码分析
// 集成RecursiveTask用来创建可以用于分支/合并框架的任务
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
// 不再将任务分解为子任务的数组大小
public static final long THRESHOLD = 10_000;
// 需要求和的数组
private final long[] numbers;
// 子任务处理数组的起始和终止位置
private final int start;
private final int end;
// 构造函数用于创建主任务
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
// 用于递归方式为主任务创建子任务
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
// 这里举一个用分支/合并 框架的实际例子,用这个框架为一个数字范围(这里用一个 long[]数组表示)求和
@Override
protected Long compute() {
// 该任务用于求和的部分 的大小
int length = end - start;
// 小于阈值顺序计算结果
if (length <= THRESHOLD) {
return computeSequentially();
}
// 创建子任务为数组的前一半求和
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
// 利用forkjoinpool线程异步执行新创建的子任务
leftTask.fork();
// 创建一个子任务用于为数组后半部分求和
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
// 同步执行第二个子任务,有可能进一步进行递归划分
Long rightResult = rightTask.compute();
// 读取第一个子任务的执行结果,如果没有执行完成就等待
Long leftResult = leftTask.join();
// 该任务的结果是两个子任务的结果的组合
return leftResult + rightResult;
}
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
// 现在编写一个方法来并行对前n个自然数求和就很简单了。你只需把想要的数字数组传给 ForkJoinSumCalculator的构造函数
public static long forkJoinSum(long n) {
// 这里用了一个LongStream来生成包含前n个自然数的数组
long[] numbers = LongStream.rangeClosed(1, n).toArray();
// 然后创建一个ForkJoinTask (RecursiveTask的父类),并把数组传递给ForkJoinSumCalculator的公共构造函数
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
// 创建了一个新的ForkJoinPool,并把任务传给它的调用方法 。在 ForkJoinPool中执行时,最后一个方法返回的值就是ForkJoinSumCalculator类定义的任务结果
return FORK_JOIN_POOL.invoke(task);
}
}
有几点需要注意:
1.一个任务可以分解成多个独立的子任务,才能让性能在并行化时 有所提升
2.不应该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直 接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。
3.每个子任务都必须等待另一个子任务完成才能启动
自动机制拆分流Spliterator
public interface Spliterator<T> { // T是Spliterator遍历的元素的类型
// tryAdvance方法的行为类似于普通的 Iterator
boolean tryAdvance(Consumer<? super T> action);
// trySplit是专为Spliterator接口设计的,因为它可以把一些元素划出去分 给第二个Spliterator(由该方法返回),让它们两个并行处理。
Spliterator<T> trySplit();
// estimateSize方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值 也有助于让拆分均匀一点
long estimateSize();
// characteristics方法,它将返回一个int,代表Spliterator本身特性集的编码。使用Spliterator的客户可以用这些特性来更好地控制和 优化它的使用。
int characteristics(); }
递归拆分过程
Spliterator的特性 :
ORDERED 元素有既定的顺序(例如List),因此Spliterator在遍历和划分时也会遵循这一顺序
DISTINCT 对于任意一对遍历过的元素x和y,x.equals(y)返回false
SORTED 遍历的元素按照一个预定义的顺序排序
SIZED 该Spliterator由一个已知大小的源建立(例如Set),因此estimatedSize()返回的是准确值
NONNULL 保证遍历的元素不会为null IMMUTABL E Spliterator的数据源不能修改。这意味着在遍历时不能添加、删除或修改任何元素
CONCURRE NT 该Spliterator的数据源可以被其他线程同时修改而无需同步
SUBSIZED 该Spliterator和所有从它拆分出来的Spliterator都是SIZED