Fork_Join - Java多线程编程
线程池可以高效的执行大量的小任务:
Fork/Join线程可以执行一种特殊的任务:
1. 可以把一个大任务拆成多个小任务并行的执行
2. Fork/Join是在JDK1.7
例如我们计算一个大数组的和:
假设有一千万个元素
我们就可以进行并行计算,然后把两个结果加起来就是最终的结果.
如果拆成两个数组以后,每个数组仍然很大,我们还可以进一步拆分,例如拆分四个,我们就可以让四核的CPU
并行的计算
Fork/Join的任务就是把大任务不断地拆成小任务
需要继承JDK的RecursiveTask的类,然后复写compute方法
在compute方法内部我们需要把一个大任务拆分多个任务,然后调用invokeAll()这个方法,同时运行
两个小任务,当两个任务都运行结束以后,invokeAll才会返回,然后我们通过join方法获得返回结果,
最后我们把两个结果加起来返回
package com.leon.day05;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* 求和的任务继承自RecursiveTask,返回值类型是null类型
* @author Leon.Sun
*
*/
class SumTask extends RecursiveTask<Long> {
// 我们定义了一个常量,用它来指定我们如何来分解任务
static final int THRESHOLD = 500;
long[] array;
int start;
int end;
// 当我们创建一个SumTask的时候,我们传入一个数组,传入start,end
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
// 如果任务足够小我们就直接进行计算
if (end - start <= THRESHOLD) {
// 如果任务足够小,直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += this.array[i];
try {
// 在求和的时候我们使用Thread.sleep()中断两毫秒
// 这样我们就可以看到forkJoin执行的时间
Thread.sleep(2);
} catch (InterruptedException e) {
}
}
// 返回计算的结果
return sum;
}
// 任务太大,一分为二,把它拆成两个子任务
int middle = (end + start) / 2;
System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start,
end, start, middle, middle, end));
// 我们就获得了subTask1和subTask2
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subTask2 = new SumTask(this.array, middle, end);
// 紧接着我们调用invokeAll方法,同时执行这两个子任务
invokeAll(subtask1, subTask2);
// 然后我们通过join方法获得两个子任务的方法
Long subresult1 = subtask1.join();
Long subresult2 = subTask2.join();
// 最后把两个加起来返回
Long result = subresult1 + subresult2;
System.out.println("result = " + subresult1 + "+" + subresult2
+ " ==> " + result);
return result;
}
}
/**
* 在这个例子中,我们对一个大数组进行求和
* @author Leon.Sun
*
*/
public class ForkJoinTaskSample {
static Random random = new Random(0);
static long random() {
return random.nextInt(1000);
}
public static void main(String[] args) throws Exception {
// 创建1000个随机数组成的数组
// 创建一个包含1000个元素的随机数组
// 由于这个数组包含了一千个元素,所以我们在不使用forkJoin的时候,它大概需要两秒钟的时间,来完成求和
long[] array = new long[1000];
// 创建一个期望求和的值
long expectedSum = 0;
for (int i = 0; i < array.length; i++) {
array[i] = random();
expectedSum += array[i];
}
System.out.println("Expected sum: " + expectedSum);
// fork/join
// 紧接着我们创建一个ForkJoinTask
ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
// 然后我们通过ForkJoinPool.commonPool获得一个ForkJoinPool
// 然后通过invoke方法执行这个任务
Long result = ForkJoinPool.commonPool().invoke(task);
long endTime = System.currentTimeMillis();
// 我们打印任务执行的时间
System.out.println("Fork/join sum: " + result + " in "
+ (endTime - startTime));
}
}
Expected sum: 492164
split 0~1000 ==> 0~500, 500~1000
result = 251591+240573 ==> 492164
Fork/join sum: 492164 in 1088
static final int THRESHOLD = 500;
这个是表示拆分的大小 每份都是500
Expected sum: 492164
split 0~1000 ==> 0~500, 500~1000
split 500~1000 ==> 500~750, 750~1000
split 0~500 ==> 0~250, 250~500
result = 120306+120267 ==> 240573
result = 127611+123980 ==> 251591
result = 251591+240573 ==> 492164
Fork/join sum: 492164 in 540
Fork/Join在JDK的内部也有一些应用:
java.util.Arrays.paralleSort(array);
这里就是通过Fork/Join把一个大数组进行并行排序,在多核CPU下就可以大大的提高排序的速度.
1. Fork/Join 是一种基于分治的算法:
首先把大任务分解成小任务,然后再合并小任务的结果
2. ForkJoinPool线程池可以把一个大任务分拆成小任务并行执行
3. 我们实现一个任务类必须继承自RecursiveTask/RecursiveAction
这两个接口的区别是RecursiveTask有返回值,RecursiveAction没有返回值
4. 我们使用Fork/Join模式可以进行并行计算提高效率