ThreadPool 线程池详解

  在很多并发或异步场景中,我们总能看到线程池的身影,它几乎是Java 中运用的最多的并发框架,也是面试的常考点。但是我对它的运行机制、实现原理一直比较模糊,所以在此总结。

  • 本文观点多来自《并发编程的艺术》,这是一本学习并发的好书,即容易理解,也不失深度。
  • 这本书我看了三遍。书读百遍,其义自见。书中写得不清楚的内容,我加入了自己的理解,如有不正,请留言指正。
  • 码字不易,喜欢的朋友点个赞呗????

一、使用线程池的好处

合理的使用线程池能够带来3 个好处:

  • 降低资源消耗。线程池可以重复使用已经创建的线程,来降低线程创建和销毁带来的资源消耗。
  • 提高响应速度。当有一个新任务到达时,一般情况下任务不需要等待线程的创建就能够立即执行。
  • 提高线程的可管理性。我们知道一个线程的生成其实算是一个代价比较高的操作,如果无限制的生成不但会消耗系统资源,还会降低系统的稳定性,而使用线程池可以对线程进行统一分配、调优和监控。

二、怎样得到一个线程池

 要理解线程池的实现原理,先要知道一个线程池的来龙去脉。

  1. 自定义生成线程池
public ThreadPoolExecutor(int corePoolSize,	// 核心线程数
                              int maximumPoolSize,	// 最大线程数
                              long keepAliveTime,	// 线程存活时间
                              TimeUnit unit,// 存活时间的单位
                              BlockingQueue<Runnable> workQueue,	// 采用的阻塞队列
                              ThreadFactory threadFactory,		// 线程的创建工厂
                              RejectedExecutionHandler handler) 	// 线程池的拒绝策略
//默认拒绝策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();          
//默认线程创建工厂
public static ThreadFactory defaultThreadFactory() {return new DefaultThreadFactory();}

ThreadPool 线程池详解
线程池有4 个构造函数,简单的意思写在注释里面了,没有的参数采用默认值,下面详细介绍一下各参数的意义或取值:

  • corePoolSize:核心线程数。当一个任务提交给线程池后,如果当前线程的数量小于这个值,不管线程池中是否有空闲的线程,都会创建一个线程。可以调用prestartAllCoreThreads() 方法让线程池提前创建好所有的核心线程。
  • maximumPoolSize:线程池中允许创建的最大线程数。
  • keepAliveTime:线程池中的线程完成任务后,最多能够存活的时间。
  • unit:存活时间的时间单位,可以选的有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS)、千分之毫秒和纳秒(NANOSECONDS)。
  • workQueue:存储任务的阻塞队列,可选的队列有
    • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
    • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
    • PriorityBlockingQueue:一个支持优先级排序的*阻塞队列。
    • DelayQueue:一个使用优先级队列实现的*阻塞队列。可用于周期任务。
    • SynchronousQueue:一个不存储元素的阻塞队列。一个任务必须等待一个线程获取任务。队列本身不存储任务,像手把手传递。
    • LinkedTransferQueue:一个由链表结构组成的*阻塞队列。
    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
  • ThreadFactory:用于设置创建线程的工厂。
  • RejectedExecutionHandler:当线程池无法再接受任务后的拒绝策略。
    • AbortPolicy:直接抛出异常。
    • CallerRunsPolicy:只用调用者所在线程来运行任务。
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • DiscardPolicy:不处理不丢掉

也可以自定义实现拒绝策略

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

通过这些参数我们就可以设计出最适合生产环境的线程池


  1. 通过Executor 框架获得
    除了自定义生成线程池外,Java 也提供了一些设计好的线程池。
  • FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,  // 核心线程和最大线程数相同
                                      0L, TimeUnit.MILLISECONDS, // 线程存活时间为0
                                      new LinkedBlockingQueue<Runnable>());// 采用有界链表阻塞队列(最多存储Integer.MAX_VALUE)
                                      //采用默认拒绝策略——直接抛出异常
    }

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

FixedThreadPool 适用于为了满足资源管理的需求,而需要限制当前线程数量的场景,适用于负载比较重的服务器

  • SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,			//  核心线程和最大线程数都为1
                                    0L, TimeUnit.MILLISECONDS,  // 线程存活时间为0
                                    new LinkedBlockingQueue<Runnable>()));/ 采用有界链表阻塞队列(最多存储Integer.MAX_VALUE)
                                    //采用默认拒绝策略——直接抛出异常
    }

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)

SingleThreadExecutor 适用于需要保证顺序地执行各个任务;并且任意时间点,不会有多个线程是活动的应用场景。

  • CachedThreadPool
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,//  核心线程为0,最大线程数都为Integer.MAX_VALUE
                                      60L, TimeUnit.SECONDS,//每个线程存活60分钟
                                      new SynchronousQueue<Runnable>());//不存储元素的阻塞队列
                                      //采用默认拒绝策略——直接抛出异常
    }

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)

CachedThreadPool适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器

  • ScheduledThreadPool

ScheduledThreadPool 有两种实现

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
 
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
              // 指定核心线程大小,最大线程数为Integer.MAX_VALUE
              // 线程无存活时间
              //DelayedWorkQueue 多用于处理周期任务
              //采用默认拒绝策略——直接抛出异常
    }
   
 public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

多个后台线程执行周期任务,主要为了满足资源管理的要求而需要限制后台进程的数量
单个后台线程执行周期任务,主要为了保证顺序执行各个任务的应用场景

三、线程池的执行流程及源码解读

通过前面的介绍我们对线程池应该有一个大致的了解,下面我们具体来看看一个任务是怎么被添加。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
...

在执行代码之前注释给了三步走,有道翻译了一下:

  1. 如果小于正在执行的核心线程数量,就开一个新的线程执行这个任务(需要获取锁)。包装成工作线程要检查运行状态和工作数量。避免在线程不应该被添加时添加。
  2. 如果任务能够成功入队,我们还是需要二次检验是否应该添加,因为可能在上一次检查过后有新的空闲线程,或者线程池在我们进入方法后已经关闭。。。
  3. 如果不能入队,那么我们要尝试添加一个线程(需要获取锁)。
  4. 如果失败,我们就知道线程池是关闭了还是饱和了还是这个任务被拒绝了。

总结一下就是下面这张图:
ThreadPool 线程池详解
继续看源码是怎么操作的:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 获取线程池线程数量(见代码解释①)
        int c = ctl.get();
        //第一步,小于核心线程数尝试添加核心线程
        if (workerCountOf(c) < corePoolSize) {
        	//(addWorker见代码解释②)
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 第二步,添加队列
        //如果线程池还在执行并且添加阻塞队列成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次检验
            // 如果线程池没有执行并且删除任务成功,拒绝任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如果线程池还在执行或者删除任务失败,并且重新检验线程数量为0
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 第三步,添加一个线程,失败则拒绝该任务
        else if (!addWorker(command, false))
            reject(command);
    }
  • 代码解释①

The main pool control state, ctl, is an atomic integer packing two conceptual fields

ctl 是线程池用来表示线程池状态和线程池线程数量的一个字段,如何实现的有兴趣的朋友可以去了解一下。

	//线程池状态
	private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
  • 代码解释②
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 当线程池处于关闭状态,并且队列为空
            if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&  firstTask == null && ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 大于线程池容量或大于核心/ 最大线程数量添加失败
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 线程数CAS+1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 添加线程操作
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        	// 获取全局锁
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker 有两个参数

core:为true 代表添加核心线程,false 代表添加核心线程以外的线程

firstTask:执行的线程,可为空