ThreadPoolExecutor源码解析之机制原理篇
线程池可以解决两个不同问题:由于减少了每个任务的调用开销,在执行大量的异步任务时,它通常能够提供更好的性能,并且还可以提供绑定和管理资源(包括执行集合任务时使用的线程)的方法。每个 ThreadPoolExecutor还维护着一些基本的统计数据,如完成的任务数。
这是ThreadPoolExecutor文档中的说明。线程池其实就是把你提交的任务(task)进行调度管理运行,但这个调度的过程以及其中的状态控制是比较复杂的。
初始化参数介绍
直接看最完整的ThreadPoolExcuter的初始化函数:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
.........
}
- corePoolSize:核心线程数,在ThreadPoolExcutor中有一个与它相关的配置:allowCoreThreadTimeOut(默认为false),当allowCoreThreadTimeOut为false时,核心线程会一直存活,哪怕是一直空闲着。而当allowCoreThreadTimeOut为true时核心线程空闲时间超过keepAliveTime时会被回收。
- maximumPoolSize:最大线程数,线程池能容纳的最大线程数,当线程池中的线程达到最大时,此时添加任务将会采用拒绝策略,默认的拒绝策略是抛出一个运行时错误(RejectedExecutionException)。值得一提的是,当初始化时用的工作队列为LinkedBlockingQeque时,这个值将无效。
- keepAliveTime:存活时间,当非核心空闲超过这个时间将被回收,同时空闲核心线程是否回收受allowCoreThreadTimeOut影响。
- unit:keepAliveTime的单位。
- workQueue:任务队列,常用有三种队列,即SynchronousQueue,LinkedBlockingDeque(*队列),ArrayBlockingQueue(有界队列)
- threadFactory:线程工厂,ThreadFactory是一个接口,用来创建worker。通过线程工厂可以对线程的一些属性进行定制。默认直接新建线程。
- RejectedExecutionHandler:也是一个接口,只有一个方法,当线程池中的资源已经全部使用,添加新线程被拒绝时,会调用RejectedExecutionHandler的rejectedExecution法。默认是抛出一个运行时异常。
这么多参数看起来好像很复杂,所以Java贴心得为我们准备了便捷的API,即可以直接用Executors创建各种线程池。分别是:
//创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
//通过设置corePoolSize为0,而maximumPoolSize为Integer.Max_VALUE(Int型数据最大值)实现。
ExecutorService cache =Executors.newCachedThreadPool();
//创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
//通过将corePoolSize和maximumPoolSize的值设置为一样的值来实现。
ExecutorService fixed= Executors.newFixedThreadPool(num);
//创建一个定长线程池,支持定时及周期性任务执行。
//通过将队列参数workQueue设置为DelayWorkQueue来实现。
ExecutorService schedule= Executors.newScheduledThreadPool(5);
//创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
//通过将corePoolSize和maximumPoolSize都设置为1来实现。
ExecutorService single= Executors.newSingleThreadExecutor();
这里需要做一个额外说明,在ThreadPoolExcuter中,worker和task是有区别的,task是用户提交的任务,而worker则是用来执行task的线程。在初始化参数中,corePoolSize和maximumPoolSize都是针对worker的,而workQueue是用来存放task的
worker介绍
前面有介绍了一下worker和task的区别,其中task是用户提交的线程任务,而worker则是ThreadPoolExecutor自己内部实现的一个类了。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** 工作线程,如果工厂失败则为空. */
final Thread thread;
/** 初始化任务,有可能为空 */
Runnable firstTask;
/** 已完成的任务计数 */
volatile long completedTasks;
/**
* 创建并初始化第一个任务,使用线程工厂来创建线程
* 初始化有3步
*1、设置AQS的同步状态为-1,表示该对象需要被唤醒
*2、初始化第一个任务
*3、调用ThreadFactory来使自身创建一个线程,并赋值给worker的成员变量thread
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//重写Runnable的run方法
/** Delegates main run loop to outer runWorker */
public void run() {
//调用ThreadPoolExecutor的runWorker方法
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
//代表是否独占锁,0-非独占 1-独占
protected boolean isHeldExclusively() {
return getState() != 0;
}
//重写AQS的tryAcquire方法尝试获取锁
protected boolean tryAcquire(int unused) {
//尝试将AQS的同步状态从0改为1
if (compareAndSetState(0, 1)) {
//如果改变成,则将当前独占模式的线程设置为当前线程并返回true
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
//否则返回false
return false;
}
//重写AQS的tryRelease尝试释放锁
protected boolean tryRelease(int unused) {
//设置当前独占模式的线程为null
setExclusiveOwnerThread(null);
//设置AQS同步状态为0
setState(0);
//返回true
return true;
}
//获取锁
public void lock() { acquire(1); }
//尝试获取锁
public boolean tryLock() { return tryAcquire(1); }
//释放锁
public void unlock() { release(1); }
//是否被独占
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker其实可以看作高级一点的线程。其中继承AbstractQueuedSynchronizer主要是为了实现锁控制。ThreadPoolExecutor会持有并管理Worker,在Worker中firstTask其实就是存放task的,而thread则是存放当前Worker本身的线程。
ctl介绍以及运行状态说明
先看源码
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//用来表示线程池数量的位数,很明显是29,Integer.SIZE=32
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池最大数量,2^29 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//我们可以看出有5种runState状态,证明至少需要3位来表示runState状态
//所以高三位就是表示runState了
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//用于存放线程任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//重入锁
private final ReentrantLock mainLock = new ReentrantLock();
//线程池当中的线程集合,只有当拥有mainLock锁的时候,才可以进行访问
private final HashSet<Worker> workers = new HashSet<Worker>();
//等待条件支持终止
private final Condition termination = mainLock.newCondition();
//创建新线程的线程工厂
private volatile ThreadFactory threadFactory;
//饱和策略
private volatile RejectedExecutionHandler handler;
}
ThreadPoolExcuter是将两个内部值打包成一个值,即将workerCount和runState(运行状态)这两个值打包在一个ctl中,因为runState有5个值,需要3位,所以有3位表示runState,而其他29位表示为workerCount。而运行时要获取其他数据时,只需要对ctl进行拆包即可。
在这里我们讲一下这个线程池最大数量的计算吧,因为这里涉及到源码以及位移之类的操作,我感觉大多数人都还是不太会这个,因为我一开始看的时候也是不太会的。
private static final int CAPACITY = (1 << COUNT_BITS) - 1; 从代码我们可以看出,是需要 1往左移29位 ,然后再减去1,那个 1往左移29位 是怎么计算的呢?1 << COUNT_BITS 1的32位2进制是 00000000 00000000 00000000 00000001 左移29位的话就是 00100000 00000000 00000000 00000000 再进行减一的操作 000 11111 11111111 11111111 11111111 也就是说线程池最大数目就是 000 11111 11111111 11111111 11111111
runState
正数的原码、反码、补码都是一样的。在计算机底层,是用补码来表示的。
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;-1 << COUNT_BITS 这里是-1往左移29位,稍微有点不一样,-1的话需要我们自己算出补码来 -1的原码 10000000 00000000 00000000 00000001 -1的反码,负数的反码是将原码除符号位以外全部取反 11111111 11111111 11111111 11111110 -1的补码,负数的补码就是将反码+1 11111111 11111111 11111111 11111111 关键了,往左移29位,所以高3位全是1就是RUNNING状态 111 00000 00000000 00000000 00000000下面几种状态以此类推!
execute方法
ThreadPoolExecutor类的核心调度方法是execute(),通过调用这个方法可以向线程池提交一个任务,交由线程池去执行。而ThreadPoolExecutor的工作逻辑也可以藉由这个方法来一步步理清。这是方法的源码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取ctl的值,前面说了,该值记录着runState和workerCount
int c = ctl.get();
/*
* 调用workerCountOf得到当前活动的线程数;
* 当前活动线程数小于corePoolSize,新建一个线程放入线程池中;
* addWorker(): 把任务添加到该线程中。
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
//如果上面的添加线程操作失败,重新获取ctl值
c = ctl.get();
}
//如果当前线程池是运行状态,并且往工作队列中添加该任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/*
* 如果当前线程不是运行状态,把任务从队列中移除
* 调用reject(内部调用handler)拒绝接受任务
*/
if (! isRunning(recheck) && remove(command))
reject(command);
//获取线程池中的有效线程数,如果为0,则执行addWorker创建一个新线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}
简单概括一下代码的逻辑,大概是这样:
- 首先判断任务是否为空,空则抛出空指针异常
- 不为空则获取线程池控制状态,判断小于corePoolSize,添加到worker集合当中执行,
- 如成功,则返回
- 失败的话再接着获取线程池控制状态,因为只有状态变了才会失败,所以重新获取
- 判断线程池是否处于运行状态,是的话则添加command到阻塞队列,加入时也会再次获取状态并且检测
- 状态是否不处于运行状态,不处于的话则将command从阻塞队列移除,并且拒绝任务
- 如果线程池里没有了线程,则创建新的线程去执行获取阻塞队列的任务执行
- 如果以上都没执行成功,则需要开启最大线程池里的线程来执行任务,失败的话就丢弃
addWorker(Runnable firstTask, boolean core)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
1、获取线程池的控制状态,进行判断,不符合则返回false,符合则下一步
2、死循环,判断workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,没有的话则对workerCount+1操作,
3、如果不符合上述判断或+1操作失败,再次获取线程池的控制状态,获取runState与刚开始获取的runState相比,不一致则跳出内层循环继续外层循环,否则继续内层循环
4、+1操作成功后,使用重入锁ReentrantLock来保证往workers当中添加worker实例,添加成功就启动该实例。
addWorkerFailed(Worker w)
addWorker方法添加worker失败,并且没有成功启动任务的时候,就会调用此方法,将任务从workers中移除,并且workerCount做-1操作。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
tryTerminate()
当对线程池执行了非正常成功逻辑的操作时,都会需要执行tryTerminate尝试终止线程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
runWorker(Worker w)
该方法的作用就是去执行任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
//允许中断
w.unlock(); // allow interrupts
//是否因为异常退出循环的标志,processWorkerExit方法会对该参数做判断
boolean completedAbruptly = true;
try {
//判断task是否为null,是的话通过getTask()从队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/* 这里的判断主要逻辑是这样:
* 如果线程池正在停止,那么就确保当前线程是中断状态;
* 如果不是的话,就要保证不是中断状态
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//用于记录任务执行前需要做哪些事,属于ThreadPoolExecutor类中的方法, //是空的,需要子类具体实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
总结一下runWorker方法的运行逻辑:
1、通过while循环不断地通过getTask()方法从队列中获取任务;
2、如果线程池正在停止状态,确保当前的线程是中断状态,否则确保当前线程不中断;
3、调用task的run()方法执行任务,执行完毕后需要置为null;
4、循环调用getTask()取不到任务了,跳出循环,执行processWorkerExit()方法。
过完runWorker()的运行流程,我们来看下getTask()是怎么实现的。getTask()方法的作用是从队列中获取任务,下面是该方法的源码:
private Runnable getTask() {
//标志是否获取任务超时
boolean timedOut = false; // Did the last poll() time out?
//死循环
for (;;) {
//获取线程池的控制状态
int c = ctl.get();
//获取线程池的runState
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
*判断线程池的状态,出现以下两种情况
*1、runState大于等于SHUTDOWN状态
*2、runState大于等于STOP或者阻塞队列为空
*将会通过CAS操作,进行workerCount-1并返回null
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取线程池的workerCount
int wc = workerCountOf(c);
// Are workers subject to culling?
/*
*allowCoreThreadTimeOut:是否允许core Thread超时,默认false
*workerCount是否大于核心核心线程池
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
*1、wc大于maximumPoolSize或者已超时
*2、队列不为空时保证至少有一个任务
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
/*
*通过CAS操作,workerCount-1
*能进行-1操作,证明wc大于maximumPoolSize或者已经超时
*/
if (compareAndDecrementWorkerCount(c))
//-1操作成功,返回null
return null;
//-1操作失败,继续循环
continue;
}
try {
/*
*wc大于核心线程池
*执行poll方法
*小于核心线程池
*执行take方法
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//判断任务不为空返回任务
if (r != null)
return r;
//获取一段时间没有获取到,获取超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- 获取线程池控制状态和runState,判断线程池是否已经关闭或者正在关闭,是的话则workerCount-1操作返回null
- 获取workerCount判断是否大于核心线程池
- 判断workerCount是否大于最大线程池数目或者已经超时,是的话workerCount-1,-1成功则返回null,不成功则回到步骤1重新继续
- 判断workerCount是否大于核心线程池,大于则用poll方法从队列获取任务,否则用take方法从队列获取任务
- 判断任务是否为空,不为空则返回获取的任务,否则回到步骤1重新继续
processWorkerExit方法
processWorkerExit方法的作用主要是对worker对象的移除,下面是方法的源码:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//是异常退出的话,执行程序将workerCount数量减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 从workers的集合中移除worker对象,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
至此,从executor方法开始的整个运行过程就完毕了,总结一下该流程:执行executor --> 新建Worker对象,并实例化线程 --> 调用runWorker方法,通过getTask()获取任务,并执行run方法 --> getTask()方法中不断向队列取任务,并将workerCount数量减1,直至返回null --> 调用processWorkerExit清除worker对象。
ThreadPoolExecutor创建线程池实例
public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(5, 10, 300, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
//用lambda表达式编写方法体中的逻辑
Runnable run = () -> {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "正在执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 10; i++) {
service.execute(run);
}
//这里一定要做关闭
service.shutdown();
}