Java中的Fork/Join框架

    Fork/Join是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

   一、 Fork/Join的运行流程图

                        Java中的Fork/Join框架

    二、工作窃取算法

    工作窃取算法是指某个线程从其他队列里窃取任务去执行。在处理一个大任务时,将这个大任务分成若干个小任务,放在不同的队列里,如果某个线程执行完自己所属的队列任务时,线程就会空闲。与其空闲,何不去其他线程队列里去任务执行呢。于是这个线程就会去其他线程队列窃取任务执行。这个队列一般选用双端队列,被窃取任务的线程永远从队尾去取,而窃取任务的线程则去队头去窃取任务。

                    Java中的Fork/Join框架

    工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争

    工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列中只有一个任务。

    三、Fork/Join框架的设计

    1、分割任务:首先我们需要一个fork类来分割一个大任务,知道子任务足够小

    2、执行任务并合并结果:分割的子任务放入到双端队列中,然后几个线程启动分别从双端队列里取任务进行执行。子任务

        执行的结果放到都统一放在一个队列里,然后启动一个线程去合并结果。

    Fork/Join使用两个类完成以上事情:

    ①ForkJoinTask:我们要使用Fork/Join,必须先创建一个ForkJoin任务,它提供fork和join方法的操作。通常情况下,我们不需要直接继承ForkJoinTask,而是继承它的子类

        a、RecursiveAction:用于没有返回结果的任务

        b、RecursiveTask:用于有返回结果的任务

    ②ForkJoinPool:ForkJoinTask任务必须要由ForkJoinPool来执行

    四、Fork/Join框架的应用

使用ForkJoin框架来进行求和

    1、首先要如何分割子任务,我们需要设置一个阈值,也就是每个线程求和的数量少于多少开始进行求和,不然就继续分割子任务,假定我们将阈值设为200000。其次因为我们是求和操作,所以需要继承RecursiveTask来进行返回结果

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class Run {
    public static void main(String[] args) {
        long l = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Future<Long> result = forkJoinPool.submit(new Sum(1,3000000000l));
        long l1 = 0;
        try {
            System.out.println(result.get());
            l1 = System.currentTimeMillis();
            System.out.println("ForkJoin框架求和耗时"+(l1-l));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
class Sum extends RecursiveTask<Long>{
    private long start;
    private long end;
    private long THRESHOLD = 200000;
    public Sum(long start,long end){
        if (start<=1) {
            this.start = 1;
            this.end = end;
        }
        else {
            this.start = start;
            this.end = end;
        }
    }
    @Override
    protected Long compute() {
       long sum  = 0;
       if (end-start<=THRESHOLD){
           for (long i=start;i<=end;i++){
               sum+=i;
           }
       }else {
           long middle = (start+end)/2;
           Sum sum1 = new Sum(start,middle);
           sum1.fork();
           Sum sum2 = new Sum(middle+1,end);
           sum2.fork();
           sum = sum1.join()+sum2.join();
       }
       return sum;
    }
}
   可以看到,ForkJoinTask和一般任务的主要区别就在于它需要实现compute方法,在这个方法中,首先要判断求和数量是否足够小,如果小就进行求和操作,不然则继续分割子任务。知道满足条件。