Java并发编程(5)-Executor线程调度框架解读
文章目录
Executor框架是在JDK5之后引入的一种线程调度类的集合,这些类的集合可以被统称为线程调度框架,本文将介绍Executor线程调度框架、各类线程池、Runnable和Callable< T >任务调度接口以及如何使用线程调度框架构造一个并发程序,最后介绍一下ExecutorCompletionService。
本文总结自《Java并发编程实践》第六章 任务执行 ,以及一些相关博客。
一、Executor线程调度框架
1.1、什么是线程调度框架
Executor框架
是在JDK5之后引入的一种线程调度类的集合,这些类的集合可以被统称为线程调度框架,在这个框架体系中,任务的执行最小单元不是线程(Thread
),而是任务(Task
);而Executor可视为任务的执行器,它的作用是执行任务,进行任务的调度。常用的线程调度类和接口有:
Executors工厂类、Executor接口、ExecutorService接口、ExecutorCompletionService类、Runnable任务接口、Callable< T >响应式任务接口、Future< T >任务状态类、newFixedThreadPool定长线程池、newCachedThreadPool可缓存线程池、newSingleThreadExecutor单一线程池、newScheduledThreadPool可调度线程池。以上可简单地用结构图表示:
1.2、Executors
提供了一系列静态工厂方法用于创建各种线程池,可返回一个Executor对象或者ExecutorService对象,用以执行任务。
1.3、Executor
一个接口,其定义了一个接收Runnable对象的方法executor(Runnable command),用以执行Runabl任务。
1.4、ExecutorService
继承于Executor,是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法。
1.5、ScheduledThreadPoolExecutor
ScheduledExecutorService的实现,一个可定时调度任务的线程池,由Executors.newScheduledThreadPool返回,可用来代替Timer进行定时任务调度。
二、各类线程池
2.1、newFixedThreadPool
创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程;如果线程池中的某个线程由于异常而结束时,线程池就会再补充一条新线程。原理是有界的阻塞队列。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
//使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
new LinkedBlockingQueue<Runnable>());
}
2.2、newCachedThreadPool
创建可缓存的线程池,如果线程池中的线程在60秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重用可用线程,否则就新建一条线程。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
//使用同步队列,将任务直接提交给线程
new SynchronousQueue<Runnable>());
}
2.3、newScheduledThreadPool
创建一个可延迟执行或定期执行的线程池。
public class HeartBeat {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
Runnable task = new Runnable() {
public void run() {
System.out.println("HeartBeat.........................");
}
};
executor.scheduleAtFixedRate(task,5,3, TimeUnit.SECONDS); //5秒后第一次执行,之后每隔3秒执行一次
}
}
2.4、newSingleThreadExecutor
创建一个单线程的Executor,如果该线程因为异常而结束就新建一条线程来继续执行后续的任务。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
//corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
三、Runnable和Callable< T >任务调度接口
3.1、Runnable接口
每个任务调度接口都是另起一个线程作业,其中Runnable接口中的run方法不能返回参数,所以也称为无响应任务接口,结合上面的内容,我们来看以下简单的使用:
@Test
public void demo1(){
//使用Executors工厂类创建含10个线程的Executor执行器
Executor executor = Executors.newFixedThreadPool(10);
//创建一个无响应式任务
Runnable task = new Runnable(){
public void run(){
//具体任务实现
System.out.println("任务已执行");
}
};
//执行任务
executor.execute(task);
}
结果:
3.2、Callable< T >接口和Future接口
Callable< T >接口用以实现有返回值的任务,需要注意的是,应该声明ExecutorService 接口类型来提交callable任务,并且获得了当前任务的状态声明Future对象,通过它获取到任务返回的值,如下:
@Test
public void demo2() throws ExecutionException, InterruptedException {
//使用Executors工厂类创建含10个线程的ExecutorService执行器
ExecutorService executor = Executors.newFixedThreadPool(10);
//创建一个响应式任务
Callable<Integer> task = new Callable<Integer>(){
public Integer call(){
//执行有返回值的任务
int i = 0;
return ++i;
}
};
//提交任务,获得任务状态类Futrue
Future<Integer> future = executor.submit(task);
//获得Future中的任务返回的响应值
Integer response = future.get();
System.out.println("任务执行,并返回了:"+response);
}
四、使用线程调度框架构造一个并发程序
4.1、单线程模式
结合上述知识,我们可以构造一个简单的并发程序了。设想这样一个情景:小明早上起床了,他去煮水。。。
如果是单线程程序,比如只用main()方法执行,小明只能等水煮完了,才能去看书,听歌,如以下代码:
public static void main(String[] args) {
//第一个循环执行10次,其中i表示小明正在煮水的进度
for (int i = 0; i < 10; i++) {
System.out.println("煮水中:"+i);
}
//第二个循环,表示看书
for (int i = 0; i < 5; i++) {
System.out.println("看书中:"+i);
}
//第三个循环,表示听音乐
for (int i = 0; i < 5; i++) {
System.out.println("听歌中: "+i);
}
//...
}
4.2、多线程调度模式-使用Runnable
上面的就是典型的单线程模式了,所有工作都要从头执行到尾,可是在现实中,小明可以变煮水边看书边听歌,这样就不会浪费时间,所以这里就要开启至少3个线程去做这3件事情:
@Test
public void demo3() throws ExecutionException, InterruptedException {
//使用Executors工厂类创建含3个线程的ExecutorService执行器
ExecutorService executor = Executors.newFixedThreadPool(3);
//创建一个任务1-煮水
Runnable task1 = new Runnable(){
public void run(){
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+"煮水中: "+i);
}
}
};
//创建一个任务2-看书
Runnable task2 = new Runnable(){
public void run(){
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+"看书中: "+i);
}
}
};
//创建一个任务3-听歌
Runnable task3 = new Runnable(){
public void run(){
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+"听歌中: "+i);
}
}
};
//提交3个任务给线程池去分配线程执行
executor.execute(task1);
executor.execute(task2);
executor.execute(task3);
}
可以看出,Executor线程调度框架分配了3个线程去分别执行了这3个任务,使得程序变成了并发的,提高了工作的效率。
4.3、多线程调度模式-使用Callable< T >
当然,我们也可以使用Callable< T >任务接口来完成上述的多线程调度例子:
@Test
public void fun14() throws ExecutionException, InterruptedException {
//使用Executors工厂类创建含3个线程的ExecutorService执行器
ExecutorService executor = Executors.newFixedThreadPool(3);
//创建一个任务1-煮水
Callable<Integer> task1 = new Callable<Integer>(){
public Integer call(){
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName()+"煮水中 :"+i);
}
return null;
}
};
//创建一个任务2-看书
Callable<Integer> task2 = new Callable<Integer>(){
@Override
public Integer call(){
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+"看书中 :"+i);
}
return null;
}
};
//创建一个任务3-听歌
Callable<Integer> task3 = new Callable<Integer>(){
@Override
public Integer call(){
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName()+"听歌中 :"+i);
}
return null;
}
};
//提交3个任务给线程池去分配线程执行并获得线程状态对象
Future<Integer> future1 = executor.submit(task1);
Future<Integer> future2 = executor.submit(task2);
Future<Integer> future3 = executor.submit(task3);
}
五、ExecutorCompletionService
实现了CompletionService,将执行完成的任务放到阻塞队列中,通过take或poll方法来获得执行结果Future,然后调用Future的get方法即可获得响应的结果了。
@Test
public void demo4() throws InterruptedException, ExecutionException {
CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>();
//获得含有缓存线程池的Executor
ExecutorService executor = Executors.newCachedThreadPool();
//创建一个ExecutorCompletionService,将executor交给它管理
ExecutorCompletionService completionService = new ExecutorCompletionService(executor);
for (int i = 0; i < 10; i++) {
int result = i;
//创建一个响应式任务,循环执行10次
Callable<Object> task = new Callable<Object>(){
public Object call() throws InterruptedException {
return result;
}
};
//提交任务
completionService.submit(task);
//将结果放入集合中
Object o = completionService.take().get();
list.add((Integer)o);
}
//将集合遍历
for (Object o : list) {
System.out.print((Integer)o+"\t");
}
}
六、ExecutorService的生命周期管理
ExecutorService继承自Executor接口,定义了一些生命周期的方法:
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
}
6.1、shutdown方法
这个方法用于会强制关闭ExecutorService,它将取消所有运行中的任务和在工作队列中等待的任务。
6.2、shutdownNow方法
这个方法会强制关闭ExecutorService,它将取消所有运行中的任务和在工作队列中等待的任务,这个方法返回一个List列表,列表中返回的是等待在工作队列中的任务。
6.3、isShutdown方法
这个方法在ExecutorService关闭后返回true,否则返回false。
6.4、isTerminated方法
这个方法会校验ExecutorService当前的状态是否为“TERMINATED”即关闭状态,当为关闭状态时时返回true否则返回false。
6.5、awaitTermination方法
这个方法有两个参数,一个是timeout即超时时间,另一个是unit即时间单位。这个方法会使线程等待timeout时长,当超过timeout时间后,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。