FutureTask简介

FutureTask类实现了Runnable和Future接口,可以异步获取执行结果

具体使用细节可以参考FutureTask介绍具体细节不再赘述,这里主要关注FutureTask的源码实现

本文对FutureTask中各个主要的调用方法源码进行了分析

这篇文章中Java代码是JDK11版本,部分实现和JDK1.8有出入,但是原理大同小异

FutureTask类成员

首先介绍一下这个类中定义了哪些类成员
在JDK11的FutureTask源码中,定义了如下的变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}


private static final VarHandle STATE;
private static final VarHandle RUNNER;
private static final VarHandle WAITERS;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
STATE = l.findVarHandle(FutureTask.class, "state", int.class);
RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);
WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}

// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}

跟Java AQS框架类似,FutureTask核心就是3个部分:state、等待队列(并发)、CAS操作。

(AQS的实现原理我尽快肝一篇)

下面详细介绍一下各个字段(outcome和callable字段源码注释讲得比较清楚,就不展开了)

state

state字段就是做状态控制,虽然多,但是状态只有三个分类,而且状态转移过程非常简单清晰

状态分为

  • 初始状态 NEW, 也就是创建了任务还没执行或者还没执行完
  • 中间状态 COMPLETING, INTERRUPTING 分别是执行完了正在写结果和正在中断运行任务的线程
  • 终态
    1. NORMAL:正常执行结束
    2. EXCEPTIONAL:发生异常
    3. CANCELLED:任务取消
    4. INTERRUPTED:任务被中断

状态转移过程

  • NEW –> COMPLETING –> NORMAL
  • NEW –> COMPLETING –> EXCEPTIONAL
  • NEW –> INTERRUPTING –> INTERRUPTED
  • NEW –> CANCELLED

WaitNode等待队列

1
2
3
4
5
6
7
8
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

可以看到waitors是一个单向链表的表头,主要存放就是因为get方法而被阻塞住的其他线程,和AQS中的双向链表类似(思考问题:为什么这里只用单向链表,不像AQS中使用双向链表),实际上这里是把waitor当做一个线程安全的栈来使用的,也就是源码注释中的Treiber stack,具体实现逻辑会在后面的代码中体现

VarHandle部分

可以看到最后还包括了三个VARHandle类型变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static final VarHandle STATE;
private static final VarHandle RUNNER;
private static final VarHandle WAITERS;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
STATE = l.findVarHandle(FutureTask.class, "state", int.class);
RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);
WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}

// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}

这是JDK9之后的新特性,是指向特定变量的强引用,可以完成对原变量的一些赋值和CAS的操作(这些操作之前需要通过UNSAFE类去实现),相当于为用户提供了更方便的CAS操作方式

具体见VarHandle介绍

这里使用Handler就是为了对队列和state字段去做CAS操作,保证其线程安全(JDK1.8没有VarHandle,是拿UNSAFE类去做的CAS操作)

FutureTask 构造方法

FutureTask的构造方法总共就俩,基本上都是用第一个方法

FutureTask(Callable callable)

这是最常用的FutureTask构造方法,callable对象中存放了具体的执行逻辑,通过泛型V指定了返回类型,并且将返回数据赋值给FutureTask的内部对象outcome(具体过程后面细说),通过FutureTask的get方法获得调用结果

1
2
3
4
5
6
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

FutureTask(Runnable runnable, V result)

这个构造方法不太常用,因为传递的是一个Runnable类,Runnable类是没有返回值的,所以result是事先定义好的,FutureTask类的get方法会直接返回这个定义好的result

1
2
3
4
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

看代码可以发现Executors把Runnable对象转换成了一个callable对象,这里用到了适配器设计模式,具体转换过程如下

1
2
3
4
5
6
7
8
9
10
11
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}

关于适配器模式的介绍可以参考适配器模式

FutureTask.run()

run方法源码如下,为了方便,我直接在源码上给出中文注释讲解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void run() {
//如果task不是初始状态(代表已经启动)则直接返回
//如果设置运行task的RUNNER线程(设置为当前线程)失败也直接返回
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
//运行,处理异常
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//设置结果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
//处理中断异常
handlePossibleCancellationInterrupt(s);
}
}

其中比较重要的步骤在set(),可以看到,只有在set期间才会发生状态从new到COMPLETING到NORMAL的迁移,而set中最后一行调用的finishCompletion方法非常重要,这个方法会唤醒所有因为调用get()方法而被阻塞住的线程,通知他们获取结果(稍后讲到get方法会细说)

1
2
3
4
5
6
7
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}

由于不调用set方法,就不会把task标记为completing和normal,因此还有一种不太常用的runAndReset方法,在执行完毕之后不会调用set()方法,这样就不会有返回值,同时也可以让task重复调用多次,显然如果不调用run方法,就不能使用get方法,因为只有run方法会把task状态设置为NORMAL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected boolean runAndReset() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

最后简单关注一下run方法中的handlePossibleCancellationInterrupt,如果run的过程中被cancel打断,就需要处理中断,这个在cancel方法中细讲,这里看到线程只是通过yield方法通知上层调度器自己可以让出CPU,等待中断处理完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt

// assert state == INTERRUPTED;

// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}

FutureTask.cancel()

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean cancel(boolean mayInterruptIfRunning) {
//根据用户自定义的是否允许中断的标识mayInterruptIfRunning进行状态转移
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
//唤醒因为调用get而阻塞的线程
finishCompletion();
}
return true;
}

可以看到用户通过mayInterruptIfRunning变量决定其是否可以被中断,如果可以中断就去设置线程的中断标识,完成中断处理,最后同样会通知所有因为get方法而陷入阻塞的进程,不同的是这些进程无法获得结果,会获取到抛出的CancellationException(在讲get方法的时候会细讲)

Cancel和Interrupt的区别

canel就是直接取消任务,get方法抛出异常

Interrupt除了抛出异常外,还会通过设置Interrupt变量,让线程自身可以感知到是否被中断,然后由线程自身决定作出什么处理(这部分逻辑需要用户自己在代码中实现,具体怎么实现才可以高效相应中断,可以参考这篇论文 Cancellation in Systems

FutureTask.get()

终于来到了最后一个方法,也是最重要的方法get!

前面一直提到如果还没有执行完毕,那么调用get方法的线程都会陷入阻塞队列中(还记得前面提到的waiters变量吗),一旦task执行完毕并且设置完成了结果,就会唤醒所有的阻塞线程并且返回结果

让我们康康get方法的源码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public V get() throws InterruptedException, ExecutionException {
int s = state;
//如果没完成就一直等着,直到完成
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//根据完成后的状态(NORMAL、CANCELLED、INTERRUPTED)
//决定返回结果(result or exception)
return report(s);
}

public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
//等了确定的时间之后还没完成就抛异常
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

看着非常简单是因为复杂冗长的逻辑被放进awaitDone方法里面了
这样有助于我们理解get方法的整体思想(见我写在源代码上的中文注释)

显然get中的核心方法是awaitDone和report

awaitDone

先看看第一个,代码大致逻辑我写在中文注释里面了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
//如果是终态直接返回(终态可以是NORMAL,CANCELLED等)
if (s > COMPLETING) {
//把当前线程创建的WaitNode中的thread字段置为null
//那么这个节点就会被从waiters中清除(具体怎么做的往下看)
if (q != null)
q.thread = null;
return s;
}
//如果执行完了,正在设置结果,就等一会儿
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
//如果检测到自己被中断,则直接不等了(从阻塞队列中删除),抛出异常
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
//如果自己还没入队,而且等待时间没到期,就为自己创建节点
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
//这一行逻辑有点多,是先把上面创建的q指向队头,再把队头CAS成q
else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
//如果调用的是有等待时间的get就走里面的逻辑
else if (timed) {
//算一下要阻塞多久
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
//调用park方法,真正阻塞当前线程
else
LockSupport.park(this);
}
}

为什么要写这么多分支

我第一遍看代码的时候很不解:很多逻辑明明可以写一起,为什么要分开写,比如:当前线程创建好了WaitNode节点之后,为什么不直接把它加入waiters等待队列,非要等到下个循环,用另一个if条件把它加进去,同理,又要等到下一个循环才能真地调用park,阻塞住当前进程?

经过一番思考我认为这么做是为了尽可能推迟阻塞线程的时间,因为线程阻塞和切换的开销还是比较大的,万一多走两遍for循环,运行task的主线程就把任务做好了,那调用get的线程就不需要真正地阻塞了,可以在下一个循环直接返回结果

这样又有新的问题,如果有一个线程的WaitNode已经加入了waiters中,那么此时怎么将其移除呢?对于这个问题,FutureTask类给出了非常巧妙的解法

首先是假设一个线程的WaitNode已经加入了waiters,然而在下一个for循环的时候,FutureTask任务已经执行完毕了,那么此时就会进入这个条件

1
2
3
4
5
6
7
8
//如果是终态直接返回(终态可以是NORMAL,CANCELLED等)
if (s > COMPLETING) {
//把当前线程创建的WaitNode中的thread字段置为null
//那么这个节点就会被从waiters中清除(具体怎么做的往下看)
if (q != null)
q.thread = null;
return s;
}

可以看到,返回状态之前,先把当前节点的thread字段置为null,可是这也没有从waiters中删除啊??

此时就需要注意,上面的awaitDone中有两个地方调用了removeWaiter方法,让我们看看这个方法是什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void removeWaiter(WaitNode node) {
if (node != null) {
//先把node的thread字段置为null
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
//从队头开始找,找到thread不为null的节点才会连接在q后面,要不就跳过
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
//出现竞争立马重试
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
}
break;
}
}
}

可以发现removeWaiter方法不只是删除了node节点,实际上删除了所有thread为null的节点,也就是说只要我们把一个队列中的节点的thread字段设置为null,那么后续只要removeWaiter被调用,该节点就会被删除。

report

回顾一下刚才讲的get方法的伪代码

1
2
3
4
5
6
7
8
9
public V get() throws InterruptedException, ExecutionException {
int s = state;
//如果没完成就一直等着,直到完成
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//根据完成后的状态(NORMAL、CANCELLED、INTERRUPTED)
//决定返回结果(result or exception)
return report(s);
}

在完成 了awaitDone之后的最后一步就是report
代码如下:

1
2
3
4
5
6
7
8
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

这个逻辑就简单明了了,正常就返回结果,取消或中断就抛异常

finishCompletion()

还剩最后一个问题:当FutureTask执行完毕之后,是如何唤醒所有阻塞线程的

还记得在run方法的最后执行了什么吗?

执行的是下面这个set方法,把结果set到outcome里面,从而被get获取到

1
2
3
4
5
6
7
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}

set方法里面最后调用了一个finishCompletion方法,这个方法的就是唤醒了所有的阻塞线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (WAITERS.weakCompareAndSet(this, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null; // to reduce footprint
}

没啥好说的,就是CAS把队头置为null,然后沿着链表一路unpark下去就好了

最后的done方法是个空方法,子类继承FutureTask可以重写done,做一些后置处理的逻辑

FutureTask的总结

OK以上就是FutureTask的几乎全部源码,有一些简单的函数被我忽略了,像是isCancelled那种根据state返回的,就没有列举了

可以发现FutureTask的核心方法是get,当执行的task还没结束并设置结果的时候,所有调用get方法的线程都会被阻塞并且放入waiters队列,直到task执行完毕或者取消。同时FutureTask使用了很多巧思来尽可能减少线程的阻塞,尽可能利用CAS操作保证线程安全。

其实这一套阻塞队列的东西和Java中的锁很像:没有拿到锁的线程也会被阻塞放入队列,直到锁被释放才会被唤醒,获取锁并且执行对应的操作。

Java中锁的机制主要就两大类:Synchronized关键字和ReentrantLock为代表的锁

前者是JVM层面的锁,底层通过C++实现(看看俺后续能不能学到这么深,要是学了就写一篇博客讲讲)

后者是JAVA层面的锁,基于大名鼎鼎的AQS(抽象队列同步器)框架,AQS很多设计思想和实现跟FutureTask非常类似,内部也是通过state进行状态控制,通过一个双向链表实现一个阻塞队列存放挂起的线程,笔者尽量在一个月之内把AQS的源码分析写出来,结合其常见的锁的类型讲讲。预计一个月是因为还需要同步更新MicroRPC的吧项目和博客~

anyway关于FutureTask的内容就先写到这里啦,有任何问题可以通过邮件与俺交流~