JUC源码分析-ScheduledThreadPoolExecutor
概述
ScheduledThreadPoolExecutor 预定任务线程池,用于执行延迟和周期性任务。
核心属性和数据结构
//是否应该废弃周期任务 当关闭时
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
//是否应该取消非周期任务 当关闭时
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
主要类图
ScheduledThreadPoolExecutor继承ThreadPoolExecutor实现了ScheduledExecutorService接口, 依赖DelayedWorkQueue实现延迟和排序,依赖ScheduledFutureTask实现周期性执行任务。
源码分析
ScheduledThreadPoolExecutor构造方法分析
常用的创建ScheduledThreadPoolExecutor的写法如下。
ScheduledExecutorService service=Executors.newScheduledThreadPool(3,new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
// TODO Auto-generated method stub
return new Thread(r,"ScheduledThreadPoolExecutorTest");
}
});
跟踪代码
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
设置一些默认参数
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);//新增defaultHandler,它是默认的拒绝策略实现
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
调用父类构造方法,就是ThreadPoolExecutor 前面文章有分析,这里不详细讲解了。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
注意到这里 corePoolSize==3,maximumPoolSize==Integer.MAX_VALUE,keepAliveTime==0,unit==TimeUnit.NANOSECONDS
workQueue== new DelayedWorkQueue()。
ScheduledExecutorService 常用方法如下
public interface ScheduledExecutorService extends ExecutorService {
//延迟delay 单位unit 执行command
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
//延迟delay 单位unit 执行callable
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//延迟delay 单位unit, 以period 单位unit 为周期执行command
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
}
以schedule方法为例
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
实际上返回了 new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit))
triggerTime 根据延迟时间计算出触发时间
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
ScheduledFutureTask构造方法
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;//触发时间
this.period = 0;//触发周期等于0
this.sequenceNumber = sequencer.getAndIncrement();//
}
调用父类构造方法
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
任务执行分析
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())//如果关闭 执行拒绝策略
reject(task);
else {
super.getQueue().add(task);//重要
if (isShutdown() && 线程池关闭
!canRunInCurrentRunState(task.isPeriodic()) &&//不可以运行
remove(task))//删除任务成功
task.cancel(false);执行取消
else
ensurePrestart();//启动线程
}
}
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize) //工作线程数小于corePoolSize,启动工作线程
addWorker(null, true);
else if (wc == 0) //如果corePoolSize 至少启动一个工作线程
addWorker(null, false);
}
super.getQueue().add(task)分析
前面写到 workerQueue的类型是DelayedWorkQueue。
DelayedWorkQueue添加数据方法分析
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();//容量不够扩容
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {//队列不为空了 唤醒阻塞
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
这个方法中只有siftUp不好理解,重点分析。
private void siftUp(int k, RunnableScheduledFuture key) {
while (k > 0) {//通过构建最小二叉堆排序
int parent = (k - 1) >>> 1;
RunnableScheduledFuture e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
siftUp与PriorityBlockingQueue 添加数据实例中的siftUpComparable处理逻辑相似。都是构建最小二叉堆。
与PriorityBlockingQueue 队列相同DelayedWorkQueue的获取方法也有相应的siftDown方法
DelayedWorkQueue获取方法分析
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {//循环直到获取到数据
RunnableScheduledFuture first = queue[0];
if (first == null)//如果队列为空阻塞
available.await();
else {
//计算距离当前时间需要延迟的时间值
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)//不需要延迟 直接获取
return finishPoll(first);
else if (leader != null) // 如果其他线程进入 延迟等待,等待直到被唤醒
available.await();//code1
else {//需要延迟 等待,有一个线程执行这里,其他获取数据线程需要等待
Thread thisThread = Thread.currentThread();
leader = thisThread;//标记执行延迟等待的线程
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;//释放占据线程
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();//唤醒code1 等待的线程
lock.unlock();
}
}
finishPoll需要重点分析
private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {
int s = --size;
RunnableScheduledFuture x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
看到了DelayedWorkQueue获取方法的siftDown方法。siftDown方法的实现逻辑与PriorityBlockingQueue的siftDown相似。
DelayedWorkQueue与PriorityBlockingQueue 一样都使用了最小二叉堆,通过添加数据,获取数据时 构建二叉堆排序。二叉堆的数据结构和排序算法参照PriorityBlockingQueue分析篇。
compareTo分析
排序中用到的compareTo需要分析,key的类型是ScheduledFutureTask
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;//同一个实例,相等
if (other instanceof ScheduledFutureTask) {// 根据触发时间与other比较 。
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;//比较序号。先加入的任务序号小。
else
return 1;
}
//获取需要延迟的时间值比较
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
compareTo 简略的逻辑就是根据 触发时间比较当前实例与其他实例的大小,
分析到这里,清楚了加入workerQueue中的数据时间,获取后是有序的,先获取触发时间比较接近的。
ScheduledFutureTask run分析
条件的任务实际上执行的是ScheduledFutureTask的run方法
public void run() {
boolean periodic = isPeriodic();//判断是否有周期
if (!canRunInCurrentRunState(periodic))//判断是否应该执行取消
cancel(false);
else if (!periodic)//非周期性 直接执行
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {//执行并重置任务状态
setNextRunTime();//根据时间周期计算下一次触发时间
reExecutePeriodic(outerTask);//outerTask 等于this, 将当前任务重新投入队列。
}
}
上面粗体部分是实现周期执行的关键。
逻辑总结:
如果应该取消,执行取消。 如果是非周期性任务值直接执行,如果是周期性任务,先执行并重置任务,然后根据时间周期计算下一次触发时间,最后将当前任务重新投入队列。
分析到这里 延迟执行和周期性的执行都清除是怎么样实现的了。
总结:ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,没有使用它的线程池框架,自己来实现。逻辑比较简单 将任务加入优先级队列DelayedWorkQueue, 然后启动核心线程数的工作线程。依赖DelayedWorkQueue实现延迟和排序,依赖ScheduledFutureTask实现周期性执行任务,理解DelayedWorkQueue的排序和延迟实现, 周期性任务执行的逻辑是本文的重点。