JUC源码分析-ScheduledThreadPoolExecutor

概述

ScheduledThreadPoolExecutor 预定任务线程池,用于执行延迟和周期性任务。

 

 

核心属性和数据结构

//是否应该废弃周期任务 当关闭时

private volatile boolean continueExistingPeriodicTasksAfterShutdown;

//是否应该取消非周期任务 当关闭时

private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

主要类图

JUC源码分析-ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承ThreadPoolExecutor实现了ScheduledExecutorService接口, 依赖DelayedWorkQueue实现延迟和排序,依赖ScheduledFutureTask实现周期性执行任务。

 

源码分析

ScheduledThreadPoolExecutor构造方法分析

常用的创建ScheduledThreadPoolExecutor的写法如下。

ScheduledExecutorService service=Executors.newScheduledThreadPool(3,new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

// TODO Auto-generated method stub

return new Thread(r,"ScheduledThreadPoolExecutorTest");

}

});

跟踪代码

public static ScheduledExecutorService newScheduledThreadPool(

int corePoolSize, ThreadFactory threadFactory) {

return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);

}

设置一些默认参数

public ScheduledThreadPoolExecutor(int corePoolSize,

ThreadFactory threadFactory) {

super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,

new DelayedWorkQueue(), threadFactory);

}

 

 

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory) {

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

threadFactory, defaultHandler);//新增defaultHandler,它是默认的拒绝策略实现

}

 

private static final RejectedExecutionHandler defaultHandler =

new AbortPolicy();

调用父类构造方法,就是ThreadPoolExecutor 前面文章有分析,这里不详细讲解了。

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

注意到这里 corePoolSize==3,maximumPoolSize==Integer.MAX_VALUE,keepAliveTime==0,unit==TimeUnit.NANOSECONDS

workQueue== new DelayedWorkQueue()。

 

ScheduledExecutorService 常用方法如下

public interface ScheduledExecutorService extends ExecutorService {

//延迟delay 单位unit 执行command

public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);

//延迟delay 单位unit 执行callable

public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);

//延迟delay 单位unit, 以period 单位unit 为周期执行command

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);

 

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

 

}

 

以schedule方法为例

 

public ScheduledFuture<?> schedule(Runnable command,

long delay,

TimeUnit unit) {

if (command == null || unit == null)

throw new NullPointerException();

RunnableScheduledFuture<?> t = decorateTask(command,

new ScheduledFutureTask<Void>(command, null,

triggerTime(delay, unit)));

delayedExecute(t);

return t;

}

 

protected <V> RunnableScheduledFuture<V> decorateTask(

Runnable runnable, RunnableScheduledFuture<V> task) {

return task;

}

实际上返回了 new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit))

triggerTime 根据延迟时间计算出触发时间

private long triggerTime(long delay, TimeUnit unit) {

return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));

}

 

long triggerTime(long delay) {

return now() +

((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));

}

ScheduledFutureTask构造方法

ScheduledFutureTask(Runnable r, V result, long ns) {

super(r, result);

this.time = ns;//触发时间

this.period = 0;//触发周期等于0

this.sequenceNumber = sequencer.getAndIncrement();//

}

调用父类构造方法

public FutureTask(Runnable runnable, V result) {

sync = new Sync(Executors.callable(runnable, result));

}

任务执行分析

 

private void delayedExecute(RunnableScheduledFuture<?> task) {

if (isShutdown())//如果关闭 执行拒绝策略

reject(task);

else {

super.getQueue().add(task);//重要

if (isShutdown() && 线程池关闭

!canRunInCurrentRunState(task.isPeriodic()) &&//不可以运行

remove(task))//删除任务成功

task.cancel(false);执行取消

else

ensurePrestart();//启动线程

}

}

boolean canRunInCurrentRunState(boolean periodic) {

return isRunningOrShutdown(periodic ?

continueExistingPeriodicTasksAfterShutdown :

executeExistingDelayedTasksAfterShutdown);

}

 

void ensurePrestart() {

int wc = workerCountOf(ctl.get());

if (wc < corePoolSize) //工作线程数小于corePoolSize,启动工作线程

addWorker(null, true);

else if (wc == 0) //如果corePoolSize 至少启动一个工作线程

addWorker(null, false);

}

super.getQueue().add(task)分析

前面写到 workerQueue的类型是DelayedWorkQueue。

DelayedWorkQueue添加数据方法分析

public boolean add(Runnable e) {

return offer(e);

}

public boolean offer(Runnable x) {

if (x == null)

throw new NullPointerException();

RunnableScheduledFuture e = (RunnableScheduledFuture)x;

final ReentrantLock lock = this.lock;

lock.lock();

try {

int i = size;

if (i >= queue.length)

grow();//容量不够扩容

size = i + 1;

if (i == 0) {

queue[0] = e;

setIndex(e, 0);

} else {

siftUp(i, e);

}

if (queue[0] == e) {//队列不为空了 唤醒阻塞

leader = null;

available.signal();

}

} finally {

lock.unlock();

}

return true;

}

这个方法中只有siftUp不好理解,重点分析。

private void siftUp(int k, RunnableScheduledFuture key) {

while (k > 0) {//通过构建最小二叉堆排序

int parent = (k - 1) >>> 1;

RunnableScheduledFuture e = queue[parent];

if (key.compareTo(e) >= 0)

break;

queue[k] = e;

setIndex(e, k);

k = parent;

}

queue[k] = key;

setIndex(key, k);

}

siftUp与PriorityBlockingQueue 添加数据实例中的siftUpComparable处理逻辑相似。都是构建最小二叉堆。

与PriorityBlockingQueue 队列相同DelayedWorkQueue的获取方法也有相应的siftDown方法

DelayedWorkQueue获取方法分析

public RunnableScheduledFuture take() throws InterruptedException {

final ReentrantLock lock = this.lock;

lock.lockInterruptibly();

try {

for (;;) {//循环直到获取到数据

RunnableScheduledFuture first = queue[0];

if (first == null)//如果队列为空阻塞

available.await();

else {

//计算距离当前时间需要延迟的时间值

long delay = first.getDelay(TimeUnit.NANOSECONDS);

if (delay <= 0)//不需要延迟 直接获取

return finishPoll(first);

else if (leader != null) // 如果其他线程进入 延迟等待,等待直到被唤醒

available.await();//code1

else {//需要延迟 等待,有一个线程执行这里,其他获取数据线程需要等待

Thread thisThread = Thread.currentThread();

leader = thisThread;//标记执行延迟等待的线程

try {

available.awaitNanos(delay);

} finally {

if (leader == thisThread)

leader = null;//释放占据线程

}

}

}

}

} finally {

if (leader == null && queue[0] != null)

available.signal();//唤醒code1 等待的线程

lock.unlock();

}

}

 

finishPoll需要重点分析

 

private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) {

int s = --size;

RunnableScheduledFuture x = queue[s];

queue[s] = null;

if (s != 0)

siftDown(0, x);

setIndex(f, -1);

return f;

}

看到了DelayedWorkQueue获取方法的siftDown方法。siftDown方法的实现逻辑与PriorityBlockingQueue的siftDown相似。

 

DelayedWorkQueue与PriorityBlockingQueue 一样都使用了最小二叉堆,通过添加数据,获取数据时 构建二叉堆排序。二叉堆的数据结构和排序算法参照PriorityBlockingQueue分析篇。

 

compareTo分析

排序中用到的compareTo需要分析,key的类型是ScheduledFutureTask

public int compareTo(Delayed other) {

if (other == this) // compare zero ONLY if same object

return 0;//同一个实例,相等

if (other instanceof ScheduledFutureTask) {// 根据触发时间与other比较 。

ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;

long diff = time - x.time;

if (diff < 0)

return -1;

else if (diff > 0)

return 1;

else if (sequenceNumber < x.sequenceNumber)

return -1;//比较序号。先加入的任务序号小。

else

return 1;

}

//获取需要延迟的时间值比较

long d = (getDelay(TimeUnit.NANOSECONDS) -

other.getDelay(TimeUnit.NANOSECONDS));

return (d == 0) ? 0 : ((d < 0) ? -1 : 1);

}

compareTo 简略的逻辑就是根据 触发时间比较当前实例与其他实例的大小,

 

分析到这里,清楚了加入workerQueue中的数据时间,获取后是有序的,先获取触发时间比较接近的。

ScheduledFutureTask run分析

条件的任务实际上执行的是ScheduledFutureTask的run方法

public void run() {

boolean periodic = isPeriodic();//判断是否有周期

if (!canRunInCurrentRunState(periodic))//判断是否应该执行取消

cancel(false);

else if (!periodic)//非周期性 直接执行

ScheduledFutureTask.super.run();

else if (ScheduledFutureTask.super.runAndReset()) {//执行并重置任务状态

setNextRunTime();//根据时间周期计算下一次触发时间

reExecutePeriodic(outerTask);//outerTask 等于this, 将当前任务重新投入队列。

}

}

上面粗体部分是实现周期执行的关键。

逻辑总结:

如果应该取消,执行取消。 如果是非周期性任务值直接执行,如果是周期性任务,先执行并重置任务,然后根据时间周期计算下一次触发时间,最后将当前任务重新投入队列。

分析到这里 延迟执行和周期性的执行都清除是怎么样实现的了。

总结:ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,没有使用它的线程池框架,自己来实现。逻辑比较简单 将任务加入优先级队列DelayedWorkQueue, 然后启动核心线程数的工作线程。依赖DelayedWorkQueue实现延迟和排序,依赖ScheduledFutureTask实现周期性执行任务,理解DelayedWorkQueue的排序和延迟实现, 周期性任务执行的逻辑是本文的重点。