Java并发之线程池

 

目录

0  前言

1  线程池原理

2  如何配置线程

3  优雅的关闭线程池

4  线程池使用示例:

5  线程池之线程复用秘密


 


0  前言

平时接触过多线程开发的童鞋应该都或多或少了解过线程池,之前发布的《阿里巴巴 Java 手册》里也有一条:

Java并发之线程池

可见线程池的重要性。

简单来说使用线程池有以下几个目的:

  • 线程是稀缺资源,不能频繁的创建。
  • 解耦作用;线程的创建于执行完全分开,方便维护。
  • 应当将其放入一个池子中,可以给其他任务进行复用。

1  线程池原理

谈到线程池就会想到池化技术,其中最核心的思想就是把宝贵的资源放到一个池子中;每次使用都从里面获取,用完之后又放回池子供其他人使用,有点吃大锅饭的意思。

那在 Java 中又是如何实现的呢?

在 JDK 1.5 之后推出了相关的 api,常见的创建线程池方式有以下几种:

  • Executors.newCachedThreadPool():无限线程池。
  • Executors.newFixedThreadPool(nThreads):创建固定大小的线程池。
  • Executors.newSingleThreadExecutor():创建单个线程的线程池。

其实看这三种方式创建的源码就会发现:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

实际上还是利用 ThreadPoolExecutor 类实现的。

所以我们重点来看下 ThreadPoolExecutor 是怎么玩的。

首先是创建线程的 api:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 

这几个核心参数的作用:

  • corePoolSize 为线程池的基本大小。
  • maximumPoolSize 为线程池最大线程大小。
  • keepAliveTime 和 unit 则是线程空闲后的存活时间,以及存活时间的单位。
  • workQueue 用于存放任务的阻塞队列。
  • handler 当队列和最大线程池都满了之后的饱和策略。

 补充上面:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

了解了这几个参数再来看看实际的运用。

通常我们都是使用:

threadPool.execute(new Job());

这样的方式来提交一个任务到线程池中,所以核心的逻辑就是 execute() 函数了。

在具体分析之前先了解下线程池中所定义的状态,这些状态都和线程的执行密切相关:

Java并发之线程池

  • RUNNING 自然是运行状态,指可以接受任务执行队列里的任务
  • SHUTDOWN 指调用了 shutdown() 方法,不再接受新任务了,但是队列里的任务得执行完毕。
  • STOP 指调用了 shutdownNow() 方法,不再接受新任务,同时抛弃阻塞队列里的所有任务并中断所有正在执行任务。
  • TIDYING 所有任务都执行完毕,在调用 shutdown()/shutdownNow() 中都会尝试更新为这个状态。
  • TERMINATED 终止状态,当执行 terminated() 后会更新为这个状态。

用图表示为:

Java并发之线程池

然后看看 execute() 方法是如何处理的:

            Java并发之线程池

  1. 获取当前线程池的状态。
  2. 当前线程数量小于 coreSize 时创建一个新的线程运行。
  3. 如果当前线程处于运行状态,并且写入阻塞队列成功。
  4. 双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
  5. 如果当前线程池为空就新创建一个线程并执行。
  6. 如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略。

这里借助《聊聊并发》的一张图来描述这个流程:

                  Java并发之线程池

 

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务(执行新进来的任务而不是任务队列的任务);
  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

2  如何配置线程

流程聊完了再来看看上文提到了几个核心参数应该如何配置呢?

有一点是肯定的,线程池肯定是不是越大越好。

通常我们是需要根据这批任务执行的性质来确定的。

  • IO 密集型任务:由于线程并不是一直在运行,所以可以尽可能的多配置线程,比如 CPU 个数 * 2
  • CPU 密集型任务(大量复杂的运算)应当分配较少的线程,比如 CPU 个数相当的大小。

当然这些都是经验值,最好的方式还是根据实际情况测试得出最佳配置。

3  优雅的关闭线程池

有运行任务自然也有关闭任务,从上文提到的 5 个状态就能看出如何来关闭线程池。

其实无非就是两个方法 shutdown()/shutdownNow()

但他们有着重要的区别:

  • shutdown() 执行后停止接受新任务,会把队列的任务执行完毕。
  • shutdownNow() 也是停止接受新任务,但会中断所有的任务,将线程池状态变为 stop。

两个方法都会中断线程,用户可自行判断是否需要响应中断。

shutdownNow() 要更简单粗暴,可以根据实际场景选择不同的方法

我通常是按照以下方式关闭线程池的:

        long start = System.currentTimeMillis();
        for (int i = 0; i <= 5; i++) {
            pool.execute(new Job());
        }

        pool.shutdown();

        while (!pool.awaitTermination(1, TimeUnit.SECONDS)) {
            LOGGER.info("线程还在执行。。。");
        }
        long end = System.currentTimeMillis();
        LOGGER.info("一共处理了【{}】", (end - start));

pool.awaitTermination(1, TimeUnit.SECONDS) 会每隔一秒钟检查一次是否执行完毕(状态为 TERMINATED),当从 while 循环退出时就表明线程池已经完全终止了。

 

4  使用示例:

package ThreadProblem;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolTest {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
        for (int i = 1; i <=15; i++) {
            MyTask myTask = new MyTask(i);
            executor.execute(myTask);
            Thread.sleep(10);
            System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size()
                    + ",已执行完别的任务数目:" + executor.getCompletedTaskCount());
        }
        System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size()
                + ",已执行完别的任务数目:" + executor.getCompletedTaskCount());
        executor.shutdown();
    }
}

class MyTask implements Runnable {

    private int taskNum;

    public MyTask(int num) {
        this.taskNum = num;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread()+"正在执行task " + taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task " + taskNum + "执行完毕");
    }
}

执行结果: 

Thread[pool-1-thread-1,5,main]正在执行task 1
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
Thread[pool-1-thread-2,5,main]正在执行task 2
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
Thread[pool-1-thread-3,5,main]正在执行task 3
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
Thread[pool-1-thread-4,5,main]正在执行task 4
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
Thread[pool-1-thread-5,5,main]正在执行task 5
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
Thread[pool-1-thread-6,5,main]正在执行task 11
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
Thread[pool-1-thread-7,5,main]正在执行task 12
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
Thread[pool-1-thread-8,5,main]正在执行task 13
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
Thread[pool-1-thread-9,5,main]正在执行task 14
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
Thread[pool-1-thread-10,5,main]正在执行task 15
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
task 1执行完毕
Thread[pool-1-thread-1,5,main]正在执行task 6
task 2执行完毕
Thread[pool-1-thread-2,5,main]正在执行task 7
task 3执行完毕
Thread[pool-1-thread-3,5,main]正在执行task 8
task 4执行完毕
Thread[pool-1-thread-4,5,main]正在执行task 9
task 5执行完毕
Thread[pool-1-thread-5,5,main]正在执行task 10
task 11执行完毕
task 12执行完毕
task 13执行完毕
task 14执行完毕
task 15执行完毕
task 6执行完毕
task 7执行完毕
task 8执行完毕
task 9执行完毕
task 10执行完毕

5  线程池之线程复用秘密 

我们先来看看,创建一个线程池需要哪些参数。

  • corePoolSize 核心线程数大小。当提交一个任务时,如果当前线程数小于corePoolSize,就会创建一个线程。即使其他有可用的空闲线程。

  • runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列:

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等上一个元素被移除之后,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

  • 不同的runnableTaskQueue对线程池运行逻辑有很大影响

  • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了*的任务队列这个参数就没什么效果。

  • keepAliveTime 线程执行结束后,保持存活的时间。

  • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

  • RejectedExecutionHandler 线程池队列饱和之后的执行策略,默认是采用AbortPolicy。JDK提供四种实现方式:

    • AbortPolicy:直接抛出异常

    • CallerRunsPolicy :只用调用者所在线程来运行任务

    • DiscardOldestPolicy 丢弃队列里最近的一个任务,并执行当前任务

    • DiscardPolicy : 不处理,丢弃掉

  • TimeUnit: keepalive的时间单位,可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

我们来看看 Executors.newCachedThreadPool() 里面的构造:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • corePoolSize 为 0,意味着核心线程数是 0。

  • maximumPoolSize 是 Integer.MAX_VALUE ,意味这可以一直往线程池提交任务,不会执行 reject 策略。

  • keepAliveTime 和 unit 决定了线程的存活时间是 60s,意味着一个线程空闲60s后才会被回收。

  • reject 策略是默认的 AbortPolicy,当线程池超出最大限制时抛出异常。不过这里 CacheThreadPool 的没有最大线程数限制,所以 reject 策略没用。

  • runnableTaskQueue 是 SynchronousQueue。该队列的特点是一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。使用该队列是实现 CacheThreadPool 的关键之一。

SynchronousQueue 的详细原理参考这里

https://blog.****.net/yanyan19880509/article/details/52562039

我们看看 CacheThreadPool 的注释介绍,大意是说当有任务提交进来,会优先使用线程池里可用的空闲线程来执行任务,但是如果没有可用的线程会直接创建线程。空闲的线程会保留 60s,之后才会被回收。这些特性决定了,当需要执行很多短时间的任务时,CacheThreadPool 的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool 并不会占用很多资源。

注释简单明了说明了 CacheThreadPool 的特性和适用场景,我们后面在阅读代码的过程中,会对注释的说明有进一步的理解。

终于到了要进入源码的时候,天天看郭神博客让我学到一个技巧,必须带着问题去看阅读,不管是看书还是看代码,这样才能事半功倍。那么问题来了:

  1. CacheThreadPool 如何实现线程保留60s。

  2. CacheThreadPool 如何实现线程复用。

带着这两个问题,去源码里寻找答案吧~

首先我们向线程池提交任务一般用 execute() 方法,我们就从这里入手:

public void execute(Runnable command) {
  if (command == null)
            throw new NullPointerException();
        //1.如果当前存在的线程少于corePoolSize,会新建线程来执行任务。然后各种检查状态
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //2.如果task被成功加入队列,还是要double-check
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3.如果task不能加入到队列,会尝试创建线程。如果创建失败,走reject流程
        else if (!addWorker(command, false))
            reject(command);

5  线程池线程复用的秘密

第一步比较简单,如果当前运行的线程少于核心线程,调用 addWorker(),创建一个线程。但是因为 CacheThreadPool 的 corePoolSize 是0,所以会跳过这步,并不会创建核心线程。

关键在第二步,首先判断了线程池是否运行状态,紧接着调用 workQueue.offer() 往对列添加 task 。 workQueue 是一个 BlockingQueue ,我们知道 BlockingQueue.offer() 方法是向队列插入元素,如果成功返回 true ,如果队列没有可用空间返回 false 。 CacheThreadPool 用的是 SynchronousQueue ,前面了解过 SynchronousQueue 的特性,添加到 SynchronousQueue 的元素必须被其他线程取出,才能塞入下一个元素。等会我们再来看看哪里是从 SynchronousQueue 取出元素。这里当任务入队列成功后,再次检查了线程池状态,还是运行状态就继续。然后检查当前运行线程数量,如果当前没有运行中的线程,调用 addWorker() ,第一个参数为 null 第二个参数是 false ,标明了非核心线程。

为什么这里 addWorker() 第一个方法要用null?带着这个疑问,我们来看看 addWorker() 方法:

private boolean addWorker(Runnable firstTask, boolean core) {
     //...这里有一段cas代码,通过双重循环目的是通过cas增加线程池线程个数
     boolean workerStarted = false;
     boolean workerAdded = false;
     Worker w = null;
     try {
         w = new Worker(firstTask);
         final Thread t = w.thread;
      //...省略部分代码
      workers.add(w);
      //...省略部分代码
      workerAdded=true;
      if (workerAdded) {
        t.start();
        workerStarted = true;
        }
 }

源代码比较长,这里省略了一部分。过程主要分成两步,第一步是一段 cas 代码通过双重循环检查状态并为当前线程数扩容 +1,第二部是将任务包装成 worker 对象,用线程安全的方式添加到当前工作 HashSet() 里,并开始执行线程。

终于读到线程开始执行的地方了,里程碑式的胜利啊同志们!

但是我们注意到,task 为 null ,Worker 里面的 firstTask 是 null ,那么 wokrer thread 里面是怎么工作下去的呢?继续跟踪代码,Worker 类继承 Runnable 接口,因此 worker thread start 后,走的是 worker.run()方法:

public void run() {
    runWorker(this);
}

继续进入 runWorker() 方法:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
      //省略代码
            while (task != null || (task = getTask()) != null) {
                //..省略
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (Exception x) {
                        thrown = x; throw x;
                    }
                //省略代码
            }
        //省略代码
    }

可以看到这里判断了 firstTask 如果为空,就调用 getTask() 方法。getTask() 方法是从 workQueue 拉取任务。所以到这里之前的疑问就解决了,调用 addWorker(null,false) 的目的是启动一个线程,然后再 workQueue 拉取任务执行。继续跟踪 getTask() 方法:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        //..省略

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
       //..省略

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

终于看到从 workQueue 拉取元素了。 CacheThreadPool 构造的时候 corePoolSize 是 0,allowCoreThreadTimeOut 默认是 false ,因此 timed 一直为 true ,会调用 workQueue.poll() 从队列拉取一个任务,等待 60s, 60s后超时,线程就会会被回收。如果 60s 内,进来一个任务,会发生什么情况?任务在 execute() 方法里,会被 offer() 进 workQueue ,因为目前队列是空的,所以 offer 进来后,马上会被阻塞的 worker.poll() 拉取出来,然后在 runWorker() 方法里执行,因为线程没有新建所以达到了线程的复用。至此,我们已经明白了线程复用的秘密,以及线程保留 60s 的实现方法。回到 execute() 方法,还有剩下一个逻辑

//3.如果task不能加入到队列,会尝试创建线程。如果创建失败,走reject流程
else if (!addWorker(command, false))
    reject(command);

因为 CacheThreadPool 用的 SynchronousQueue ,所以没有空闲线程, SynchronousQueue 有一个元素正在被阻塞,那么就不能加入到队列里。会走到 addWorker(commond,false) 这里,这个时候因为就会新建线程来执行任务。如果 addWorker() 返回 false 才会走 reject 策略。那么什么时候 addWorker() 什么时候会返回false呢?我们看代码:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            1.线程池已经shutdown,或者提交进来task为ull且队列也是空,返回false
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                2.如果需要创建核心线程但是当前线程已经大于corePoolSize 返回false,如果是非核心线程但是已经超出maximumPoolSize,返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                //省略代码。。。
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())
                        throw new IllegalThreadStateException();
                        //省略代码。。。
                    }
            }
        }
  //省略代码。。。
      }

addWorker() 有以下情况会返回 false :

  1. 线程池已经 shutdown,或者提交进来 task 为ull且同时任务队列也是空,返回 false。

  2. 如果需要创建核心线程但是当前线程已经大于 corePoolSize 返回 false,如果是非核心线程但是已经超出 maximumPoolSize ,返回 false。

  3. 创建线程后,检查是否已经启动。

我们逐条检查。第一点只有线程池被 shutDown() 才会出现。第二点由于 CacheThreadPool 的 corePoolSize 是 0 , maximumPoolSize  是 Intger.MAX_VALUE ,所以也不会出现。第三点是保护性错误,我猜因为线程允许通过外部的 ThreadFactory 创建,所以检查了一下是否外部已经 start,如果开发者编码规范,一般这种情况也不会出现。

综上,在线程池没有 shutDown 的情况下,addWorker() 不会返回 false ,不会走reject流程,所以理论上 CacheThreadPool 可以一直提交任务,符合CacheThreadPool注释里的描述。

引申

Executors 还提供了这么一个方法 Executors.newFixedThreadPool(4) 来创建一个有固定线程数量的线程池,我们看看创建的参数:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

参数中核心线程和最大线程一样,线程保留时间 0 ,使用 LinkedBlockingQueue 作为任务队列,这样的线程池有什么样的特性呢?我们看看注释说明,大意是说这是一个有着固定线程数量且使用*队列作为线程队列的线程池。如果有新的任务提交,但是没有线程可用,这个任务会一直等待直到有可用的线程。如果一个线程因为异常终止了,当线程不够用的时候会再创建一个出来。线程会一直保持,直到线程池 shutDown。

和 CacheThreadPool 相比,FixedThreadPool 注释里描述的特性有几个不同的地方。

  1. 因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核心线程。

  2. 在 getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。

  3. 由于线程不会被回收,会一直卡在阻塞,所以没有任务的情况下, FixedThreadPool 占用资源更多。

FixedThreadPool 和 CacheThreadPool 也有相同点,都使用*队列,意味着可用一直向线程池提交任务,不会触发 reject 策略。

 

总结

好了,又到了总结的时候,相信各位认真看完的应该对链表的基本操作非常熟悉了.

CacheThreadPool 的运行流程如下:

  1. 提交任务进线程池。

  2. 因为 corePoolSize 为0的关系,不创建核心线程。

  3. 尝试将任务添加到 SynchronousQueue 队列。

  4. 如果SynchronousQueue 入列成功,等待被当前运行的线程空闲后拉取执行。如果当前运行线程为0,调用addWorker( null , false )创建一个非核心线程出来,然后从 SynchronousQueue 拉取任务并在当前线程执行,实现线程的复用。

  5. 如果 SynchronousQueue 已有任务在等待,入列失败。因为 maximumPoolSize 无上限的原因,创建新的非核心线程来执行任务。

纵观整个流程,通过设置 ThreadPoolExecutor 的几个参数,并加上应用 SynchronousQueue 的特性,然后在 ThreadPoolExecutor 的运行框架下,构建出了一个可以线程复用的线程池。ThreadPoolExecutor 还有很强的扩展性,可以通过自定义参数来实现不同的线程池。这么牛X的代码,这辈子写是不可能写得出来了,争取能完全读懂吧。