JDK 7 中的 Fork/Join 模式

介绍

随着多核芯片逐渐成为主流,大多数软件开发人员不可避免地需要了解并行编程的知识。而同时,主流程序语言正在将越来越多的并行特性合并到标准库或者语言本身之中。我们可以看到,JDK 在这方面同样走在潮流的前方。在 JDK 标准版 5 中,由 Doug Lea 提供的并行框架成为了标准库的一部分(JSR-166)。随后,在 JDK 6 中,一些新的并行特性,例如并行 collection 框架,合并到了标准库中(JSR-166x)。直到今天,尽管 Java SE 7 还没有正式发布,一些并行相关的新特性已经出现在 JSR-166y 中:

  1. Fork/Join 模式;
  2. TransferQueue,它继承自 BlockingQueue 并能在队列满时阻塞“生产者”;
  3. ArrayTasks/ListTasks,用于并行执行某些数组/列表相关任务的类;
  4. IntTasks/LongTasks/DoubleTasks,用于并行处理数字类型数组的工具类,提供了排序、查找、求和、求最小值、求最大值等功能;

其中,对 Fork/Join 模式的支持可能是对开发并行软件来说最通用的新特性。在 JSR-166y 中,Doug Lea 实现 ArrayTasks/ListTasks/IntTasks/LongTasks/DoubleTasks 时就大量的用到了 Fork/Join 模式。读者还需要注意一点,因为 JDK 7 还没有正式发布,因此本文涉及到的功能和发布版本有可能不一样。

Fork/Join 模式有自己的适用范围。如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决。图 1 给出了一个 Fork/Join 模式的示意图,位于图上部的 Task 依赖于位于其下的 Task 的执行,只有当所有的子任务都完成之后,调用者才能获得 Task 0 的返回结果。


图 1. Fork/Join 模式示意图
JDK 7 中的 Fork/Join 模式

可以说,Fork/Join 模式能够解决很多种类的并行问题。通过使用 Doug Lea 提供的 Fork/Join 框架,软件开发人员只需要关注任务的划分和中间结果的组合就能充分利用并行平台的优良性能。其他和并行相关的诸多难于处理的问题,例如负载平衡、同步等,都可以由框架采用统一的方式解决。这样,我们就能够轻松地获得并行的好处而避免了并行编程的困难且容易出错的缺点。

使用 Fork/Join 模式

在开始尝试 Fork/Join 模式之前,我们需要从 Doug Lea 主持的 Concurrency JSR-166 Interest Site 上下载 JSR-166y 的源代码,并且我们还需要安装最新版本的 JDK 6(下载网址请参阅 参考资源)。Fork/Join 模式的使用方式非常直观。首先,我们需要编写一个 ForkJoinTask 来完成子任务的分割、中间结果的合并等工作。随后,我们将这个 ForkJoinTask 交给 ForkJoinPool 来完成应用的执行。

通常我们并不直接继承 ForkJoinTask,它包含了太多的抽象方法。针对特定的问题,我们可以选择 ForkJoinTask 的不同子类来完成任务。RecursiveAction 是 ForkJoinTask 的一个子类,它代表了一类最简单的 ForkJoinTask:不需要返回值,当子任务都执行完毕之后,不需要进行中间结果的组合。如果我们从 RecursiveAction 开始继承,那么我们只需要重载 protected void compute() 方法。下面,我们来看看怎么为快速排序算法建立一个 ForkJoinTask 的子类:


清单 1. ForkJoinTask 的子类

JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式classSortTaskextendsRecursiveAction...{
JDK 7 中的 Fork/Join 模式
finallong[]array;
JDK 7 中的 Fork/Join 模式
finalintlo;
JDK 7 中的 Fork/Join 模式
finalinthi;
JDK 7 中的 Fork/Join 模式
privateintTHRESHOLD=30;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicSortTask(long[]array)...{
JDK 7 中的 Fork/Join 模式
this.array=array;
JDK 7 中的 Fork/Join 模式
this.lo=0;
JDK 7 中的 Fork/Join 模式
this.hi=array.length-1;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicSortTask(long[]array,intlo,inthi)...{
JDK 7 中的 Fork/Join 模式
this.array=array;
JDK 7 中的 Fork/Join 模式
this.lo=lo;
JDK 7 中的 Fork/Join 模式
this.hi=hi;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
protectedvoidcompute()...{
JDK 7 中的 Fork/Join 模式
if(hi-lo<THRESHOLD)
JDK 7 中的 Fork/Join 模式sequentiallySort(array,lo,hi);
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
else...{
JDK 7 中的 Fork/Join 模式
intpivot=partition(array,lo,hi);
JDK 7 中的 Fork/Join 模式coInvoke(
newSortTask(array,lo,pivot-1),newSortTask(array,
JDK 7 中的 Fork/Join 模式pivot
+1,hi));
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
privateintpartition(long[]array,intlo,inthi)...{
JDK 7 中的 Fork/Join 模式
longx=array[hi];
JDK 7 中的 Fork/Join 模式
inti=lo-1;
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
for(intj=lo;j<hi;j++)...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
if(array[j]<=x)...{
JDK 7 中的 Fork/Join 模式i
++;
JDK 7 中的 Fork/Join 模式swap(array,i,j);
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式swap(array,i
+1,hi);
JDK 7 中的 Fork/Join 模式
returni+1;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
privatevoidswap(long[]array,inti,intj)...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
if(i!=j)...{
JDK 7 中的 Fork/Join 模式
longtemp=array[i];
JDK 7 中的 Fork/Join 模式array[i]
=array[j];
JDK 7 中的 Fork/Join 模式array[j]
=temp;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
privatevoidsequentiallySort(long[]array,intlo,inthi)...{
JDK 7 中的 Fork/Join 模式Arrays.sort(array,lo,hi
+1);
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

清单 1 中,SortTask 首先通过 partition() 方法将数组分成两个部分。随后,两个子任务将被生成并分别排序数组的两个部分。当子任务足够小时,再将其分割为更小的任务反而引起性能的降低。因此,这里我们使用一个 THRESHOLD,限定在子任务规模较小时,使用直接排序,而不是再将其分割成为更小的任务。其中,我们用到了 RecursiveAction 提供的方法 coInvoke()。它表示:启动所有的任务,并在所有任务都正常结束后返回。如果其中一个任务出现异常,则其它所有的任务都取消。coInvoke() 的参数还可以是任务的数组。

现在剩下的工作就是将 SortTask 提交到 ForkJoinPool 了。ForkJoinPool() 默认建立具有与 CPU 可使用线程数相等线程个数的线程池。我们在一个 JUnit 的 test 方法中将 SortTask 提交给一个新建的 ForkJoinPool:


清单 2. 新建的 ForkJoinPool

JDK 7 中的 Fork/Join 模式@Test
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicvoidtestSort()throwsException...{
JDK 7 中的 Fork/Join 模式ForkJoinTasksort
=newSortTask(array);
JDK 7 中的 Fork/Join 模式ForkJoinPoolfjpool
=newForkJoinPool();
JDK 7 中的 Fork/Join 模式fjpool.submit(sort);
JDK 7 中的 Fork/Join 模式fjpool.shutdown();
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式fjpool.awaitTermination(
30,TimeUnit.SECONDS);
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式assertTrue(checkSorted(array));
JDK 7 中的 Fork/Join 模式}

在上面的代码中,我们用到了 ForkJoinPool 提供的如下函数:

  1. submit():将 ForkJoinTask 类的对象提交给 ForkJoinPool,ForkJoinPool 将立刻开始执行 ForkJoinTask。
  2. shutdown():执行此方法之后,ForkJoinPool 不再接受新的任务,但是已经提交的任务可以继续执行。如果希望立刻停止所有的任务,可以尝试 shutdownNow() 方法。
  3. awaitTermination():阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束。

并行快速排序的完整代码如下所示:


清单 3. 并行快速排序的完整代码

JDK 7 中的 Fork/Join 模式packagetests;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式
importstaticorg.junit.Assert.*;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式
importjava.util.Arrays;
JDK 7 中的 Fork/Join 模式
importjava.util.Random;
JDK 7 中的 Fork/Join 模式
importjava.util.concurrent.TimeUnit;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式
importjsr166y.forkjoin.ForkJoinPool;
JDK 7 中的 Fork/Join 模式
importjsr166y.forkjoin.ForkJoinTask;
JDK 7 中的 Fork/Join 模式
importjsr166y.forkjoin.RecursiveAction;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式
importorg.junit.Before;
JDK 7 中的 Fork/Join 模式
importorg.junit.Test;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
classSortTaskextendsRecursiveAction...{
JDK 7 中的 Fork/Join 模式
finallong[]array;
JDK 7 中的 Fork/Join 模式
finalintlo;
JDK 7 中的 Fork/Join 模式
finalinthi;
JDK 7 中的 Fork/Join 模式
privateintTHRESHOLD=0;//Fordemoonly
JDK 7 中的 Fork/Join 模式

JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicSortTask(long[]array)...{
JDK 7 中的 Fork/Join 模式
this.array=array;
JDK 7 中的 Fork/Join 模式
this.lo=0;
JDK 7 中的 Fork/Join 模式
this.hi=array.length-1;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicSortTask(long[]array,intlo,inthi)...{
JDK 7 中的 Fork/Join 模式
this.array=array;
JDK 7 中的 Fork/Join 模式
this.lo=lo;
JDK 7 中的 Fork/Join 模式
this.hi=hi;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
protectedvoidcompute()...{
JDK 7 中的 Fork/Join 模式
if(hi-lo<THRESHOLD)
JDK 7 中的 Fork/Join 模式sequentiallySort(array,lo,hi);
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
else...{
JDK 7 中的 Fork/Join 模式
intpivot=partition(array,lo,hi);
JDK 7 中的 Fork/Join 模式System.out.println(
" pivot="+pivot+",low="+lo+",high="+hi);
JDK 7 中的 Fork/Join 模式System.out.println(
"array"+Arrays.toString(array));
JDK 7 中的 Fork/Join 模式coInvoke(
newSortTask(array,lo,pivot-1),newSortTask(array,
JDK 7 中的 Fork/Join 模式pivot
+1,hi));
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
privateintpartition(long[]array,intlo,inthi)...{
JDK 7 中的 Fork/Join 模式
longx=array[hi];
JDK 7 中的 Fork/Join 模式
inti=lo-1;
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
for(intj=lo;j<hi;j++)...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
if(array[j]<=x)...{
JDK 7 中的 Fork/Join 模式i
++;
JDK 7 中的 Fork/Join 模式swap(array,i,j);
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式swap(array,i
+1,hi);
JDK 7 中的 Fork/Join 模式
returni+1;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
privatevoidswap(long[]array,inti,intj)...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
if(i!=j)...{
JDK 7 中的 Fork/Join 模式
longtemp=array[i];
JDK 7 中的 Fork/Join 模式array[i]
=array[j];
JDK 7 中的 Fork/Join 模式array[j]
=temp;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
privatevoidsequentiallySort(long[]array,intlo,inthi)...{
JDK 7 中的 Fork/Join 模式Arrays.sort(array,lo,hi
+1);
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicclassTestForkJoinSimple...{
JDK 7 中的 Fork/Join 模式
privatestaticfinalintNARRAY=16;//Fordemoonly
JDK 7 中的 Fork/Join 模式
long[]array=newlong[NARRAY];
JDK 7 中的 Fork/Join 模式Randomrand
=newRandom();
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式@Before
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicvoidsetUp()...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
for(inti=0;i<array.length;i++)...{
JDK 7 中的 Fork/Join 模式array[i]
=rand.nextLong()%100;//Fordemoonly
JDK 7 中的 Fork/Join 模式
}

JDK 7 中的 Fork/Join 模式System.out.println(
"InitialArray:"+Arrays.toString(array));
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式@Test
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicvoidtestSort()throwsException...{
JDK 7 中的 Fork/Join 模式ForkJoinTasksort
=newSortTask(array);
JDK 7 中的 Fork/Join 模式ForkJoinPoolfjpool
=newForkJoinPool();
JDK 7 中的 Fork/Join 模式fjpool.submit(sort);
JDK 7 中的 Fork/Join 模式fjpool.shutdown();
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式fjpool.awaitTermination(
30,TimeUnit.SECONDS);
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式assertTrue(checkSorted(array));
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
booleancheckSorted(long[]a)...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
for(inti=0;i<a.length-1;i++)...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
if(a[i]>(a[i+1]))...{
JDK 7 中的 Fork/Join 模式
returnfalse;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
returntrue;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

运行以上代码,我们可以得到以下结果:

JDK 7 中的 Fork/Join 模式InitialArray:[46,-12,74,-67,76,-13,-91,-96]
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式pivot
=0,low=0,high=7
JDK 7 中的 Fork/Join 模式array[
-96,-12,74,-67,76,-13,-91,46]
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式pivot
=5,low=1,high=7
JDK 7 中的 Fork/Join 模式array[
-96,-12,-67,-13,-91,46,76,74]
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式pivot
=1,low=1,high=4
JDK 7 中的 Fork/Join 模式array[
-96,-91,-67,-13,-12,46,74,76]
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式pivot
=4,low=2,high=4
JDK 7 中的 Fork/Join 模式array[
-96,-91,-67,-13,-12,46,74,76]
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式pivot
=3,low=2,high=3
JDK 7 中的 Fork/Join 模式array[
-96,-91,-67,-13,-12,46,74,76]
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式pivot
=2,low=2,high=2
JDK 7 中的 Fork/Join 模式array[
-96,-91,-67,-13,-12,46,74,76]
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式pivot
=6,low=6,high=7
JDK 7 中的 Fork/Join 模式array[
-96,-91,-67,-13,-12,46,74,76]
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式pivot
=7,low=7,high=7
JDK 7 中的 Fork/Join 模式array[
-96,-91,-67,-13,-12,46,74,76]

Fork/Join 模式高级特性

使用 RecursiveTask

除了 RecursiveAction,Fork/Join 框架还提供了其他 ForkJoinTask 子类:带有返回值的 RecursiveTask,使用 finish() 方法显式中止的 AsyncAction 和 LinkedAsyncAction,以及可使用 TaskBarrier 为每个任务设置不同中止条件的 CyclicAction。

从 RecursiveTask 继承的子类同样需要重载 protected void compute() 方法。与 RecursiveAction 稍有不同的是,它可使用泛型指定一个返回值的类型。下面,我们来看看如何使用 RecursiveTask 的子类。


清单 4. RecursiveTask 的子类

JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式classFibonacciextendsRecursiveTask<Integer>...{
JDK 7 中的 Fork/Join 模式
finalintn;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式Fibonacci(
intn)...{
JDK 7 中的 Fork/Join 模式
this.n=n;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
privateintcompute(intsmall)...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
finalint[]results=...{1,1,2,3,5,8,13,21,34,55,89};
JDK 7 中的 Fork/Join 模式
returnresults[small];
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicIntegercompute()...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
if(n<=10)...{
JDK 7 中的 Fork/Join 模式
returncompute(n);
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式Fibonaccif1
=newFibonacci(n-1);
JDK 7 中的 Fork/Join 模式Fibonaccif2
=newFibonacci(n-2);
JDK 7 中的 Fork/Join 模式f1.fork();
JDK 7 中的 Fork/Join 模式f2.fork();
JDK 7 中的 Fork/Join 模式
returnf1.join()+f2.join();
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

清单 4 中, Fibonacci 的返回值为 Integer 类型。其 compute() 函数首先建立两个子任务,启动子任务执行,阻塞以等待子任务的结果返回,相加后得到最终结果。同样,当子任务足够小时,通过查表得到其结果,以减小因过多地分割任务引起的性能降低。其中,我们用到了 RecursiveTask 提供的方法 fork()join()。它们分别表示:子任务的异步执行和阻塞等待结果完成。

现在剩下的工作就是将 Fibonacci 提交到 ForkJoinPool 了,我们在一个 JUnit 的 test 方法中作了如下处理:


清单 5. 将 Fibonacci 提交到 ForkJoinPool

JDK 7 中的 Fork/Join 模式@Test
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicvoidtestFibonacci()throwsInterruptedException,ExecutionException...{
JDK 7 中的 Fork/Join 模式ForkJoinTask
<Integer>fjt=newFibonacci(45);
JDK 7 中的 Fork/Join 模式ForkJoinPoolfjpool
=newForkJoinPool();
JDK 7 中的 Fork/Join 模式Future
<Integer>result=fjpool.submit(fjt);
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式
//dosomething
JDK 7 中的 Fork/Join 模式
System.out.println(result.get());
JDK 7 中的 Fork/Join 模式}

使用 CyclicAction 来处理循环任务

CyclicAction 的用法稍微复杂一些。如果一个复杂任务需要几个线程协作完成,并且线程之间需要在某个点等待所有其他线程到达,那么我们就能方便的用 CyclicAction 和 TaskBarrier 来完成。图 2 描述了使用 CyclicAction 和 TaskBarrier 的一个典型场景。


图 2. 使用 CyclicAction 和 TaskBarrier 执行多线程任务
JDK 7 中的 Fork/Join 模式

继承自 CyclicAction 的子类需要 TaskBarrier 为每个任务设置不同的中止条件。从 CyclicAction 继承的子类需要重载 protected void compute() 方法,定义在 barrier 的每个步骤需要执行的动作。compute() 方法将被反复执行直到 barrierisTerminated() 方法返回 True。TaskBarrier 的行为类似于 CyclicBarrier。下面,我们来看看如何使用 CyclicAction 的子类。


清单 6. 使用 CyclicAction 的子类

JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式classConcurrentPrintextendsRecursiveAction...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
protectedvoidcompute()...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式TaskBarrierb
=newTaskBarrier()...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
protectedbooleanterminate(intcycle,intregisteredParties)...{
JDK 7 中的 Fork/Join 模式System.out.println(
"Cycleis"+cycle+";"
JDK 7 中的 Fork/Join 模式
+registeredParties+"parties");
JDK 7 中的 Fork/Join 模式
returncycle>=10;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}
;
JDK 7 中的 Fork/Join 模式
intn=3;
JDK 7 中的 Fork/Join 模式CyclicAction[]actions
=newCyclicAction[n];
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
for(inti=0;i<n;++i)...{
JDK 7 中的 Fork/Join 模式
finalintindex=i;
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式actions[i]
=newCyclicAction(b)...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
protectedvoidcompute()...{
JDK 7 中的 Fork/Join 模式System.out.println(
"I'mworking"+getCycle()+""
JDK 7 中的 Fork/Join 模式
+index);
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
try...{
JDK 7 中的 Fork/Join 模式Thread.sleep(
500);
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式}
catch(InterruptedExceptione)...{
JDK 7 中的 Fork/Join 模式e.printStackTrace();
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}
;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
for(inti=0;i<n;++i)
JDK 7 中的 Fork/Join 模式actions[i].fork();
JDK 7 中的 Fork/Join 模式
for(inti=0;i<n;++i)
JDK 7 中的 Fork/Join 模式actions[i].join();
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

清单 6 中,CyclicAction[] 数组建立了三个任务,打印各自的工作次数和序号。而在 b.terminate() 方法中,我们设置的中止条件表示重复 10 次计算后中止。现在剩下的工作就是将 ConcurrentPrint 提交到 ForkJoinPool 了。我们可以在 ForkJoinPool 的构造函数中指定需要的线程数目,例如 ForkJoinPool(4) 就表明线程池包含 4 个线程。我们在一个 JUnit 的 test 方法中运行 ConcurrentPrint 的这个循环任务:


清单 7. 运行 ConcurrentPrint 循环任务

JDK 7 中的 Fork/Join 模式@Test
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicvoidtestBarrier()throwsInterruptedException,ExecutionException...{
JDK 7 中的 Fork/Join 模式ForkJoinTaskfjt
=newConcurrentPrint();
JDK 7 中的 Fork/Join 模式ForkJoinPoolfjpool
=newForkJoinPool(4);
JDK 7 中的 Fork/Join 模式fjpool.submit(fjt);
JDK 7 中的 Fork/Join 模式fjpool.shutdown();
JDK 7 中的 Fork/Join 模式}

RecursiveTask 和 CyclicAction 两个例子的完整代码如下所示:


清单 8. RecursiveTask 和 CyclicAction 两个例子的完整代码

JDK 7 中的 Fork/Join 模式packagetests;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式
importjava.util.concurrent.ExecutionException;
JDK 7 中的 Fork/Join 模式
importjava.util.concurrent.Future;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式
importjsr166y.forkjoin.CyclicAction;
JDK 7 中的 Fork/Join 模式
importjsr166y.forkjoin.ForkJoinPool;
JDK 7 中的 Fork/Join 模式
importjsr166y.forkjoin.ForkJoinTask;
JDK 7 中的 Fork/Join 模式
importjsr166y.forkjoin.RecursiveAction;
JDK 7 中的 Fork/Join 模式
importjsr166y.forkjoin.RecursiveTask;
JDK 7 中的 Fork/Join 模式
importjsr166y.forkjoin.TaskBarrier;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式
importorg.junit.Test;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
classFibonacciextendsRecursiveTask<Integer>...{
JDK 7 中的 Fork/Join 模式
finalintn;
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式Fibonacci(
intn)...{
JDK 7 中的 Fork/Join 模式
this.n=n;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
privateintcompute(intsmall)...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
finalint[]results=...{1,1,2,3,5,8,13,21,34,55,89};
JDK 7 中的 Fork/Join 模式
returnresults[small];
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicIntegercompute()...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
if(n<=10)...{
JDK 7 中的 Fork/Join 模式
returncompute(n);
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式Fibonaccif1
=newFibonacci(n-1);
JDK 7 中的 Fork/Join 模式Fibonaccif2
=newFibonacci(n-2);
JDK 7 中的 Fork/Join 模式System.out.println(
"forknewthreadfor"+(n-1));
JDK 7 中的 Fork/Join 模式f1.fork();
JDK 7 中的 Fork/Join 模式System.out.println(
"forknewthreadfor"+(n-2));
JDK 7 中的 Fork/Join 模式f2.fork();
JDK 7 中的 Fork/Join 模式
returnf1.join()+f2.join();
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
classConcurrentPrintextendsRecursiveAction...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
protectedvoidcompute()...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式TaskBarrierb
=newTaskBarrier()...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
protectedbooleanterminate(intcycle,intregisteredParties)...{
JDK 7 中的 Fork/Join 模式System.out.println(
"Cycleis"+cycle+";"
JDK 7 中的 Fork/Join 模式
+registeredParties+"parties");
JDK 7 中的 Fork/Join 模式
returncycle>=10;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}
;
JDK 7 中的 Fork/Join 模式
intn=3;
JDK 7 中的 Fork/Join 模式CyclicAction[]actions
=newCyclicAction[n];
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
for(inti=0;i<n;++i)...{
JDK 7 中的 Fork/Join 模式
finalintindex=i;
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式actions[i]
=newCyclicAction(b)...{
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
protectedvoidcompute()...{
JDK 7 中的 Fork/Join 模式System.out.println(
"I'mworking"+getCycle()+""
JDK 7 中的 Fork/Join 模式
+index);
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
try...{
JDK 7 中的 Fork/Join 模式Thread.sleep(
500);
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式}
catch(InterruptedExceptione)...{
JDK 7 中的 Fork/Join 模式e.printStackTrace();
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}
;
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
for(inti=0;i<n;++i)
JDK 7 中的 Fork/Join 模式actions[i].fork();
JDK 7 中的 Fork/Join 模式
for(inti=0;i<n;++i)
JDK 7 中的 Fork/Join 模式actions[i].join();
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicclassTestForkJoin...{
JDK 7 中的 Fork/Join 模式@Test
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicvoidtestBarrier()throwsInterruptedException,ExecutionException...{
JDK 7 中的 Fork/Join 模式System.out.println(
" testingTaskBarrier...");
JDK 7 中的 Fork/Join 模式ForkJoinTaskfjt
=newConcurrentPrint();
JDK 7 中的 Fork/Join 模式ForkJoinPoolfjpool
=newForkJoinPool(4);
JDK 7 中的 Fork/Join 模式fjpool.submit(fjt);
JDK 7 中的 Fork/Join 模式fjpool.shutdown();
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式@Test
JDK 7 中的 Fork/Join 模式JDK 7 中的 Fork/Join 模式
publicvoidtestFibonacci()throwsInterruptedException,ExecutionException...{
JDK 7 中的 Fork/Join 模式System.out.println(
" testingFibonacci...");
JDK 7 中的 Fork/Join 模式
finalintnum=14;//Fordemoonly
JDK 7 中的 Fork/Join 模式
ForkJoinTask<Integer>fjt=newFibonacci(num);
JDK 7 中的 Fork/Join 模式ForkJoinPoolfjpool
=newForkJoinPool();
JDK 7 中的 Fork/Join 模式Future
<Integer>result=fjpool.submit(fjt);
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式
//dosomething
JDK 7 中的 Fork/Join 模式
System.out.println("Fibonacci("+num+")="+result.get());
JDK 7 中的 Fork/Join 模式}

JDK 7 中的 Fork/Join 模式}

运行以上代码,我们可以得到以下结果:

JDK 7 中的 Fork/Join 模式testingTaskBarrier...
JDK 7 中的 Fork/Join 模式I'mworking
02
JDK 7 中的 Fork/Join 模式I'mworking
00
JDK 7 中的 Fork/Join 模式I'mworking
01
JDK 7 中的 Fork/Join 模式Cycleis
0;3parties
JDK 7 中的 Fork/Join 模式I'mworking
12
JDK 7 中的 Fork/Join 模式I'mworking
10
JDK 7 中的 Fork/Join 模式I'mworking
11
JDK 7 中的 Fork/Join 模式Cycleis
1;3parties
JDK 7 中的 Fork/Join 模式I'mworking
20
JDK 7 中的 Fork/Join 模式I'mworking
21
JDK 7 中的 Fork/Join 模式I'mworking
22
JDK 7 中的 Fork/Join 模式Cycleis
2;3parties
JDK 7 中的 Fork/Join 模式I'mworking
30
JDK 7 中的 Fork/Join 模式I'mworking
32
JDK 7 中的 Fork/Join 模式I'mworking
31
JDK 7 中的 Fork/Join 模式Cycleis
3;3parties
JDK 7 中的 Fork/Join 模式I'mworking
42
JDK 7 中的 Fork/Join 模式I'mworking
40
JDK 7 中的 Fork/Join 模式I'mworking
41
JDK 7 中的 Fork/Join 模式Cycleis
4;3parties
JDK 7 中的 Fork/Join 模式I'mworking
51
JDK 7 中的 Fork/Join 模式I'mworking
50
JDK 7 中的 Fork/Join 模式I'mworking
52
JDK 7 中的 Fork/Join 模式Cycleis
5;3parties
JDK 7 中的 Fork/Join 模式I'mworking
60
JDK 7 中的 Fork/Join 模式I'mworking
62
JDK 7 中的 Fork/Join 模式I'mworking
61
JDK 7 中的 Fork/Join 模式Cycleis
6;3parties
JDK 7 中的 Fork/Join 模式I'mworking
72
JDK 7 中的 Fork/Join 模式I'mworking
70
JDK 7 中的 Fork/Join 模式I'mworking
71
JDK 7 中的 Fork/Join 模式Cycleis
7;3parties
JDK 7 中的 Fork/Join 模式I'mworking
81
JDK 7 中的 Fork/Join 模式I'mworking
80
JDK 7 中的 Fork/Join 模式I'mworking
82
JDK 7 中的 Fork/Join 模式Cycleis
8;3parties
JDK 7 中的 Fork/Join 模式I'mworking
90
JDK 7 中的 Fork/Join 模式I'mworking
92
JDK 7 中的 Fork/Join 模式
JDK 7 中的 Fork/Join 模式testingFibonacci
...
JDK 7 中的 Fork/Join 模式forknewthread
for13
JDK 7 中的 Fork/Join 模式forknewthread
for12
JDK 7 中的 Fork/Join 模式forknewthread
for11
JDK 7 中的 Fork/Join 模式forknewthread
for10
JDK 7 中的 Fork/Join 模式forknewthread
for12
JDK 7 中的 Fork/Join 模式forknewthread
for11
JDK 7 中的 Fork/Join 模式forknewthread
for10
JDK 7 中的 Fork/Join 模式forknewthread
for9
JDK 7 中的 Fork/Join 模式forknewthread
for10
JDK 7 中的 Fork/Join 模式forknewthread
for9
JDK 7 中的 Fork/Join 模式forknewthread
for11
JDK 7 中的 Fork/Join 模式forknewthread
for10
JDK 7 中的 Fork/Join 模式forknewthread
for10
JDK 7 中的 Fork/Join 模式forknewthread
for9
JDK 7 中的 Fork/Join 模式Fibonacci
(14)=610

结论

从以上的例子中可以看到,通过使用 Fork/Join 模式,软件开发人员能够方便地利用多核平台的计算能力。尽管还没有做到对软件开发人员完全透明,Fork/Join 模式已经极大地简化了编写并发程序的琐碎工作。对于符合 Fork/Join 模式的应用,软件开发人员不再需要处理各种并行相关事务,例如同步、通信等,以难以调试而闻名的死锁和 data race 等错误也就不会出现,提升了思考问题的层次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,仅仅关注如何划分任务和组合中间结果,将剩下的事情丢给 Fork/Join 框架。

在实际工作中利用 Fork/Join 模式,可以充分享受多核平台为应用带来的免费午餐。



参考资料

学习


获得产品和技术

  • 访问 Doug Lea 的 JSR 166 站点获得最新的源代码。

  • Sun 公司 网站下载 Java SE 6。