【多线程】线程调用之 FutureTask 代码 简析
多线程调用 分为两种基本情况
1.需要返回结果
2.不需要返回结果
JAVA线程的基础实现
不需要返回结果:
new Thread(new Runnable(){
public void run() { /**具体的内容*/}
})
需要返回结果:
Callable<String> userCall= new UserCall(); FutureTask<String> ft= new FutureTask<>(userCall); Thread t = new Thread(ft); t.start(); String name= ft.get();
今天重点解析 FutureTask需要返回结果的 线程任务
如果读过FutureTask就知道 FutureTask 分两个部分
1. 执行 目标任务部分
2. 获取 目标任务返回结果部分
FutureTask是何方神圣?先看结构图
Runnable接口大家熟悉,里面就一个 run方法直接跳过
重点看Future接口
public interface Future<V> { /**取消任务*/ boolean cancel(boolean mayInterruptIfRunning); /**是否取消任务*/ boolean isCancelled(); /***/ boolean isDone(); /**无超时时间获取任务结果*/ V get() throws InterruptedException, ExecutionException; /**在超时时间内获取任务结果*/ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
RunnableFuture 接口有疑问?
按道理说一个空的接口即可?网上回答也各种不一致
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
Callable 接口
public interface Callable<V> { V call() throws Exception; }
实际运行的目标对象,运行完毕目标任务后,返回结果
以上 接口的逻辑关系梳理完毕后,直奔主题
1. 执行 目标任务部分
执行目标部分 ,重点介绍 FutureTask的 状态位
一共7个
private volatile int state; /**新建FutureTask初始化的状态*/ private static final int NEW = 0; /**任务运行结束,等待善后工作*/ private static final int COMPLETING = 1; /**任务运行结束,且善后工作处理完毕,注意正常情况下,只有善后工作处理完毕,才算正常的future.get可以获取结果*/ private static final int NORMAL = 2; /**任务执行出现业务异常*/ private static final int EXCEPTIONAL = 3; /**目标任务被取消*/ private static final int CANCELLED = 4; /***/ private static final int INTERRUPTING = 5; /***/ private static final int INTERRUPTED = 6;
下面边拆机边分析
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
此地可以看出 初始化状态 state=NEW 即 state=0;
以下是FutureTask 的run方法
public void run() { /**1.先看看当前futureTask的状态是否是NEW,且通过CAS 设置runner 如果 state!=NEW 或者 设置当前线程的引用到 FutureTask失败,直接返回 * 注意此 线程为 执行目标任务的线程 */ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; /**2. 目标任务不为空,且 FutureTask的state状态 ==NEW ,执行目标任务*/ if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } /**3.如果目标任务正常执行完毕,设置结果*/ if (ran) set(result); } } finally { /***4.善后工作,将runner 即 引用的当前运行的Thread引用置为空 * 如果 Future里面的state状态 >=INTERRUPTING * 中断处理 */ // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
步骤总结
1. 比对FutureTask状态 如果 state不是NEW=0 新建,或者 设置当前线程引用失败,直接返回
2.再次检查state状态 ==NEW且目标任务不能为空,执行目标任务
3.目标任务成功执行,设置执行结果 重点注意 这个设置结果,有核心逻辑稍后分析 ,这个 protected void set(V v) ;
protected void set(V v) { //CAS将 state状态从NEW=0 设置为 COMPLETING=1即 state从0 调整为1 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //将返回结果设置给 outcome outcome = v; //更新状态 从1 到 2 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state /**这个 有两个地方调用,主要作用是 通过 CAS 唤起 等待的 结果的线程, 主要注意点: WaitNode 的结构 以及循环处理的过程 ***/ finishCompletion(); } }
4. finally 最终善后工作 ,移除FutureTask对当前线程的引用,处理中断
以上的是正常的执行流程。
如果目标任务执行的时候抛出异常,
result = null; //设置结果为空
ran = false; // 执行结果标记伟false
setException(ex);//设置异常结果
protected void setException(Throwable t) { // 通过CAS将FutureTask的状态 从 0 设置为 1 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //将要抛出的异常引用设置给FutureTask的outcome 以供后续的异常处理 outcome = t; //将state直接设置为EXCEPTIONAL =3 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state //这个 有两个地方调用,主要作用是 通过 CAS 唤起 等待的 结果的线程, // 主要注意点: WaitNode 的结构 finishCompletion(); } }
2. 获取 目标任务返回结果部分
String name= futureTask.get();
重点注意 ,futureTask 可以被多个线程引用,这个就是
入口代码如下:
public V get() throws InterruptedException, ExecutionException { int s = state; /**FutureTask的状态<=COMPLETING 即 state<=1 证明目标任务尚未完全处理完毕*/ if (s <= COMPLETING) /**将当前获取结果的线程加入等待节点队列*/ s = awaitDone(false, 0L); /***s>COMPLETING 任务已经出结果了,不论是NORMAL 或者是 *NORMAL= 2; * EXCEPTIONAL= 3; *CANCELLED= 4; *INTERRUPTING = 5; *INTERRUPTED= 6;*/ return report(s); }
返回结果:
1. 判断状态state=NORMAL 即任务正常结束 ,直接返回结果
2.如果 state>=CANCELLED 抛出取消异常
3. 否则 就是目标任务执行过程中抛出的业务异常信息
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
核心代码:
awaitDone 的步骤:
1.计算超时的时间,声明空的WaitNode对象,以及 是声明否排队的 queued状态=false
2. 进入无限循环
2.1 在无限循环中判断是否获取 等待结果的线程是否中断,如果中断,则删除当前中断的等待线程节点,并对外抛出中断异常。
2.2 state>COMPLETING (1)证明 已经出来具体结果解除 节点对等待获取结果的线程的引用,返回state
2.3 state ==COMPLETING 目标执行完毕,但是尚未结束收尾工作,等待结果的线程让出CPU
2.4 如果 节点为空,说明刚调用 等待动作,尚未加入等待列表,先创建一个等待节点
2.5 如果需要排队,则 按照单链表的顺序,先来后到的加入到节点尾部
2.6如果超时,移除节点,返回结果
2.7 是否有超时等待设置,如果有,则判断是否超时,超时则移除当前等待的节点并返回状态,否则 等待固定的时间
2.8 上述情况都不满足,一直等待,直到 调用unpak唤醒当前线程
private int awaitDone(boolean timed, long nanos)
throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
1