ThreadPoolExecutor源码解析之机制原理篇

线程池可以解决两个不同问题:由于减少了每个任务的调用开销,在执行大量的异步任务时,它通常能够提供更好的性能,并且还可以提供绑定和管理资源(包括执行集合任务时使用的线程)的方法。每个 ThreadPoolExecutor还维护着一些基本的统计数据,如完成的任务数。

这是ThreadPoolExecutor文档中的说明。线程池其实就是把你提交的任务(task)进行调度管理运行,但这个调度的过程以及其中的状态控制是比较复杂的。

初始化参数介绍

直接看最完整的ThreadPoolExcuter的初始化函数:

  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {


                          .........


                       }
  • corePoolSize:核心线程数,在ThreadPoolExcutor中有一个与它相关的配置:allowCoreThreadTimeOut(默认为false),当allowCoreThreadTimeOut为false时,核心线程会一直存活,哪怕是一直空闲着。而当allowCoreThreadTimeOut为true时核心线程空闲时间超过keepAliveTime时会被回收。
  • maximumPoolSize:最大线程数,线程池能容纳的最大线程数,当线程池中的线程达到最大时,此时添加任务将会采用拒绝策略,默认的拒绝策略是抛出一个运行时错误(RejectedExecutionException)。值得一提的是,当初始化时用的工作队列为LinkedBlockingQeque时,这个值将无效。
  • keepAliveTime:存活时间,当非核心空闲超过这个时间将被回收,同时空闲核心线程是否回收受allowCoreThreadTimeOut影响。
  • unit:keepAliveTime的单位。
  • workQueue:任务队列,常用有三种队列,即SynchronousQueue,LinkedBlockingDeque(*队列),ArrayBlockingQueue(有界队列)
  • threadFactory:线程工厂,ThreadFactory是一个接口,用来创建worker。通过线程工厂可以对线程的一些属性进行定制。默认直接新建线程。
  • RejectedExecutionHandler:也是一个接口,只有一个方法,当线程池中的资源已经全部使用,添加新线程被拒绝时,会调用RejectedExecutionHandler的rejectedExecution法。默认是抛出一个运行时异常。

这么多参数看起来好像很复杂,所以Java贴心得为我们准备了便捷的API,即可以直接用Executors创建各种线程池。分别是:

//创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
//通过设置corePoolSize为0,而maximumPoolSize为Integer.Max_VALUE(Int型数据最大值)实现。
ExecutorService cache =Executors.newCachedThreadPool();

//创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
//通过将corePoolSize和maximumPoolSize的值设置为一样的值来实现。
 ExecutorService fixed= Executors.newFixedThreadPool(num);

//创建一个定长线程池,支持定时及周期性任务执行。
//通过将队列参数workQueue设置为DelayWorkQueue来实现。 
ExecutorService schedule= Executors.newScheduledThreadPool(5);

//创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
//通过将corePoolSize和maximumPoolSize都设置为1来实现。
 ExecutorService single= Executors.newSingleThreadExecutor();

这里需要做一个额外说明,在ThreadPoolExcuter中,worker和task是有区别的,task是用户提交的任务,而worker则是用来执行task的线程。在初始化参数中,corePoolSize和maximumPoolSize都是针对worker的,而workQueue是用来存放task的

worker介绍

前面有介绍了一下worker和task的区别,其中task是用户提交的线程任务,而worker则是ThreadPoolExecutor自己内部实现的一个类了。

private final class Worker
 extends AbstractQueuedSynchronizer
 implements Runnable
 {
 /**
 * This class will never be serialized, but we provide a
 * serialVersionUID to suppress a javac warning.
 */
 private static final long serialVersionUID = 6138294804551838833L;
 ​
 /** 工作线程,如果工厂失败则为空. */
 final Thread thread;
 /** 初始化任务,有可能为空 */
 Runnable firstTask;
 /** 已完成的任务计数 */
 volatile long completedTasks;
 ​
 /**
 * 创建并初始化第一个任务,使用线程工厂来创建线程
 * 初始化有3步
 *1、设置AQS的同步状态为-1,表示该对象需要被唤醒
 *2、初始化第一个任务
 *3、调用ThreadFactory来使自身创建一个线程,并赋值给worker的成员变量thread
 */
 Worker(Runnable firstTask) {
 setState(-1); // inhibit interrupts until runWorker
 this.firstTask = firstTask;
 this.thread = getThreadFactory().newThread(this);
 }
 ​
 //重写Runnable的run方法
 /** Delegates main run loop to outer runWorker */
 public void run() {
 //调用ThreadPoolExecutor的runWorker方法
 runWorker(this);
 }
 ​
 // Lock methods
 //
 // The value 0 represents the unlocked state.
 // The value 1 represents the locked state.
 //代表是否独占锁,0-非独占 1-独占
 protected boolean isHeldExclusively() {
 return getState() != 0;
 }
 //重写AQS的tryAcquire方法尝试获取锁
 protected boolean tryAcquire(int unused) {
 //尝试将AQS的同步状态从0改为1
 if (compareAndSetState(0, 1)) {
 //如果改变成,则将当前独占模式的线程设置为当前线程并返回true
 setExclusiveOwnerThread(Thread.currentThread());
 return true;
 }
 //否则返回false
 return false;
 }
 ​
 //重写AQS的tryRelease尝试释放锁
 protected boolean tryRelease(int unused) {
 //设置当前独占模式的线程为null
 setExclusiveOwnerThread(null);
 //设置AQS同步状态为0
 setState(0);
 //返回true
 return true;
 }
 ​
 //获取锁
 public void lock() { acquire(1); }
 //尝试获取锁
 public boolean tryLock() { return tryAcquire(1); }
 //释放锁
 public void unlock() { release(1); }
 //是否被独占
 public boolean isLocked() { return isHeldExclusively(); }
 ​
 void interruptIfStarted() {
 Thread t;
 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
 try {
 t.interrupt();
 } catch (SecurityException ignore) {
 }
 }
 }
 }

Worker其实可以看作高级一点的线程。其中继承AbstractQueuedSynchronizer主要是为了实现锁控制。ThreadPoolExecutor会持有并管理Worker,在Worker中firstTask其实就是存放task的,而thread则是存放当前Worker本身的线程。

ctl介绍以及运行状态说明

先看源码

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 //用来表示线程池数量的位数,很明显是29,Integer.SIZE=32
 private static final int COUNT_BITS = Integer.SIZE - 3;
 //线程池最大数量,2^29 - 1
 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
 // runState is stored in the high-order bits
 //我们可以看出有5种runState状态,证明至少需要3位来表示runState状态
 //所以高三位就是表示runState了
 private static final int RUNNING = -1 << COUNT_BITS;
 private static final int SHUTDOWN = 0 << COUNT_BITS;
 private static final int STOP = 1 << COUNT_BITS;
 private static final int TIDYING = 2 << COUNT_BITS;
 private static final int TERMINATED = 3 << COUNT_BITS;
 // Packing and unpacking ctl
 private static int runStateOf(int c) { return c & ~CAPACITY; }
 private static int workerCountOf(int c) { return c & CAPACITY; }
 private static int ctlOf(int rs, int wc) { return rs | wc; }
 //用于存放线程任务的阻塞队列
 private final BlockingQueue<Runnable> workQueue;
 //重入锁
 private final ReentrantLock mainLock = new ReentrantLock();
 //线程池当中的线程集合,只有当拥有mainLock锁的时候,才可以进行访问
 private final HashSet<Worker> workers = new HashSet<Worker>();
 //等待条件支持终止
 private final Condition termination = mainLock.newCondition();
 //创建新线程的线程工厂
 private volatile ThreadFactory threadFactory;
 //饱和策略
 private volatile RejectedExecutionHandler handler;
}

ThreadPoolExcuter是将两个内部值打包成一个值,即将workerCount和runState(运行状态)这两个值打包在一个ctl中,因为runState有5个值,需要3位,所以有3位表示runState,而其他29位表示为workerCount。而运行时要获取其他数据时,只需要对ctl进行拆包即可。

在这里我们讲一下这个线程池最大数量的计算吧,因为这里涉及到源码以及位移之类的操作,我感觉大多数人都还是不太会这个,因为我一开始看的时候也是不太会的。

private static final int CAPACITY = (1 << COUNT_BITS) - 1;
从代码我们可以看出,是需要 1往左移29位 ,然后再减去1,那个 1往左移29位 是怎么计算的呢?
1 << COUNT_BITS
 ​
 1的32位2进制是
 00000000 00000000 00000000 00000001
 ​
 左移29位的话就是
 00100000 00000000 00000000 00000000
 ​
 再进行减一的操作
 000 11111 11111111 11111111 11111111
 ​
 也就是说线程池最大数目就是
 000 11111 11111111 11111111 11111111

runState

正数的原码、反码、补码都是一样的。在计算机底层,是用补码来表示的。

private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
-1 << COUNT_BITS
这里是-1往左移29位,稍微有点不一样,-1的话需要我们自己算出补码来
 ​
-1的原码
10000000 00000000 00000000 00000001
 ​
-1的反码,负数的反码是将原码除符号位以外全部取反
11111111 11111111 11111111 11111110
 ​
-1的补码,负数的补码就是将反码+1
11111111 11111111 11111111 11111111
 ​
关键了,往左移29位,所以高3位全是1就是RUNNING状态
111 00000 00000000 00000000 00000000

下面几种状态以此类推!

execute方法

ThreadPoolExecutor类的核心调度方法是execute(),通过调用这个方法可以向线程池提交一个任务,交由线程池去执行。而ThreadPoolExecutor的工作逻辑也可以藉由这个方法来一步步理清。这是方法的源码:

public void execute(Runnable command) {
 if (command == null)
 throw new NullPointerException();
 //获取ctl的值,前面说了,该值记录着runState和workerCount
 int c = ctl.get();
 /*
 * 调用workerCountOf得到当前活动的线程数;
 * 当前活动线程数小于corePoolSize,新建一个线程放入线程池中;
 * addWorker(): 把任务添加到该线程中。
 */
 if (workerCountOf(c) < corePoolSize) {
 if (addWorker(command, true))
 return;
 //如果上面的添加线程操作失败,重新获取ctl值
 c = ctl.get();
 }
 //如果当前线程池是运行状态,并且往工作队列中添加该任务
 if (isRunning(c) && workQueue.offer(command)) {
 int recheck = ctl.get();
 /*
 * 如果当前线程不是运行状态,把任务从队列中移除
 * 调用reject(内部调用handler)拒绝接受任务 
 */
 if (! isRunning(recheck) && remove(command))
 reject(command);
 //获取线程池中的有效线程数,如果为0,则执行addWorker创建一个新线程
 else if (workerCountOf(recheck) == 0)
 addWorker(null, false);
 }
 /*
 * 如果执行到这里,有两种情况:
 * 1. 线程池已经不是RUNNING状态;
 * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
 * 如果失败则拒绝该任务
 */
 else if (!addWorker(command, false))
 reject(command);
}

简单概括一下代码的逻辑,大概是这样:

  1. 首先判断任务是否为空,空则抛出空指针异常
  2. 不为空则获取线程池控制状态,判断小于corePoolSize,添加到worker集合当中执行,
  3. 如成功,则返回
  4. 失败的话再接着获取线程池控制状态,因为只有状态变了才会失败,所以重新获取
  5. 判断线程池是否处于运行状态,是的话则添加command到阻塞队列,加入时也会再次获取状态并且检测
  6. ​ 状态是否不处于运行状态,不处于的话则将command从阻塞队列移除,并且拒绝任务
  7. 如果线程池里没有了线程,则创建新的线程去执行获取阻塞队列的任务执行
  8. 如果以上都没执行成功,则需要开启最大线程池里的线程来执行任务,失败的话就丢弃

addWorker(Runnable firstTask, boolean core)

  private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

1、获取线程池的控制状态,进行判断,不符合则返回false,符合则下一步

2、死循环,判断workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,没有的话则对workerCount+1操作,

3、如果不符合上述判断或+1操作失败,再次获取线程池的控制状态,获取runState与刚开始获取的runState相比,不一致则跳出内层循环继续外层循环,否则继续内层循环

4、+1操作成功后,使用重入锁ReentrantLock来保证往workers当中添加worker实例,添加成功就启动该实例。

addWorkerFailed(Worker w)

addWorker方法添加worker失败,并且没有成功启动任务的时候,就会调用此方法,将任务从workers中移除,并且workerCount做-1操作。

  private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

tryTerminate()

当对线程池执行了非正常成功逻辑的操作时,都会需要执行tryTerminate尝试终止线程池

  final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

runWorker(Worker w)

该方法的作用就是去执行任务

final void runWorker(Worker w) {
 Thread wt = Thread.currentThread();
 //获取第一个任务
 Runnable task = w.firstTask;
 w.firstTask = null;
 //允许中断
 w.unlock(); // allow interrupts
 //是否因为异常退出循环的标志,processWorkerExit方法会对该参数做判断
 boolean completedAbruptly = true;
 try {
 //判断task是否为null,是的话通过getTask()从队列中获取任务
 while (task != null || (task = getTask()) != null) {
 w.lock();
 // If pool is stopping, ensure thread is interrupted;
 // if not, ensure thread is not interrupted. This
 // requires a recheck in second case to deal with
 // shutdownNow race while clearing interrupt
 /* 这里的判断主要逻辑是这样:
 * 如果线程池正在停止,那么就确保当前线程是中断状态;
 * 如果不是的话,就要保证不是中断状态
 */
 if ((runStateAtLeast(ctl.get(), STOP) ||
 (Thread.interrupted() &&
 runStateAtLeast(ctl.get(), STOP))) &&
 !wt.isInterrupted())
 wt.interrupt();
 try {
 //用于记录任务执行前需要做哪些事,属于ThreadPoolExecutor类中的方法, //是空的,需要子类具体实现
 beforeExecute(wt, task);
 Throwable thrown = null;
 try {
 //执行任务
 task.run();
 } catch (RuntimeException x) {
 thrown = x; throw x;
 } catch (Error x) {
 thrown = x; throw x;
 } catch (Throwable x) {
 thrown = x; throw new Error(x);
 } finally {
 afterExecute(task, thrown);
 }
 } finally {
 task = null;
 w.completedTasks++;
 w.unlock();
 }
 }
 completedAbruptly = false;
 } finally { 
 processWorkerExit(w, completedAbruptly);
 }
}

总结一下runWorker方法的运行逻辑:

1、通过while循环不断地通过getTask()方法从队列中获取任务;

2、如果线程池正在停止状态,确保当前的线程是中断状态,否则确保当前线程不中断;

3、调用task的run()方法执行任务,执行完毕后需要置为null;

4、循环调用getTask()取不到任务了,跳出循环,执行processWorkerExit()方法。

过完runWorker()的运行流程,我们来看下getTask()是怎么实现的。getTask()方法的作用是从队列中获取任务,下面是该方法的源码:

private Runnable getTask() {
 //标志是否获取任务超时
 boolean timedOut = false; // Did the last poll() time out?
 ​
 //死循环
 for (;;) {
 //获取线程池的控制状态
 int c = ctl.get();
 //获取线程池的runState
 int rs = runStateOf(c);
 ​
 // Check if queue empty only if necessary.
 /*
 *判断线程池的状态,出现以下两种情况
 *1、runState大于等于SHUTDOWN状态
 *2、runState大于等于STOP或者阻塞队列为空
 *将会通过CAS操作,进行workerCount-1并返回null
 */
 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
 decrementWorkerCount();
 return null;
 }
 ​
 //获取线程池的workerCount
 int wc = workerCountOf(c);
 ​
 // Are workers subject to culling?
 /*
 *allowCoreThreadTimeOut:是否允许core Thread超时,默认false
 *workerCount是否大于核心核心线程池
 */
 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 ​
 /*
 *1、wc大于maximumPoolSize或者已超时
 *2、队列不为空时保证至少有一个任务
 */
 if ((wc > maximumPoolSize || (timed && timedOut))
 && (wc > 1 || workQueue.isEmpty())) {
 /*
 *通过CAS操作,workerCount-1
 *能进行-1操作,证明wc大于maximumPoolSize或者已经超时
 */
 if (compareAndDecrementWorkerCount(c))
 //-1操作成功,返回null
 return null;
 //-1操作失败,继续循环
 continue;
 }
 ​
 try {
 /*
 *wc大于核心线程池
 *执行poll方法
 *小于核心线程池
 *执行take方法
 */
 Runnable r = timed ?
 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
 workQueue.take();
 //判断任务不为空返回任务
 if (r != null)
 return r;
 //获取一段时间没有获取到,获取超时
 timedOut = true;
 } catch (InterruptedException retry) {
 timedOut = false;
 }
 }
 }
  1. 获取线程池控制状态和runState,判断线程池是否已经关闭或者正在关闭,是的话则workerCount-1操作返回null
  2. 获取workerCount判断是否大于核心线程池
  3. 判断workerCount是否大于最大线程池数目或者已经超时,是的话workerCount-1,-1成功则返回null,不成功则回到步骤1重新继续
  4. 判断workerCount是否大于核心线程池,大于则用poll方法从队列获取任务,否则用take方法从队列获取任务
  5. 判断任务是否为空,不为空则返回获取的任务,否则回到步骤1重新继续

processWorkerExit方法

processWorkerExit方法的作用主要是对worker对象的移除,下面是方法的源码:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
 //是异常退出的话,执行程序将workerCount数量减1
 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
 decrementWorkerCount();
 final ReentrantLock mainLock = this.mainLock;
 mainLock.lock();
 try {
 completedTaskCount += w.completedTasks;
 // 从workers的集合中移除worker对象,也就表示着从线程池中移除了一个工作线程
 workers.remove(w);
 } finally {
 mainLock.unlock();
 }
 tryTerminate();
 int c = ctl.get();
 if (runStateLessThan(c, STOP)) {
 if (!completedAbruptly) {
 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
 if (min == 0 && ! workQueue.isEmpty())
 min = 1;
 if (workerCountOf(c) >= min)
 return; // replacement not needed
 }
 addWorker(null, false);
 }
}

至此,从executor方法开始的整个运行过程就完毕了,总结一下该流程:执行executor --> 新建Worker对象,并实例化线程 --> 调用runWorker方法,通过getTask()获取任务,并执行run方法 --> getTask()方法中不断向队列取任务,并将workerCount数量减1,直至返回null --> 调用processWorkerExit清除worker对象。

ThreadPoolExecutor创建线程池实例

  public static void main(String[] args) {
        ExecutorService service = new ThreadPoolExecutor(5, 10, 300, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(5));
        //用lambda表达式编写方法体中的逻辑
        Runnable run = () -> {
            try {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "正在执行");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        for (int i = 0; i < 10; i++) {
            service.execute(run);
        }
        //这里一定要做关闭
        service.shutdown();
    }

ThreadPoolExecutor源码解析之机制原理篇