/** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ privatevolatile Thread runner; /** Treiber stack of waiting threads */ privatevolatile WaitNode waiters;
// Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class<?> ensureLoaded = LockSupport.class; }
// Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class<?> ensureLoaded = LockSupport.class; }
publicvoidrun() { //如果task不是初始状态(代表已经启动)则直接返回 //如果设置运行task的RUNNER线程(设置为当前线程)失败也直接返回 if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; //运行,处理异常 try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) //设置结果 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts ints= state; if (s >= INTERRUPTING) //处理中断异常 handlePossibleCancellationInterrupt(s); } }
protectedbooleanrunAndReset() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) returnfalse; booleanran=false; ints= state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
privatevoidhandlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
public V get()throws InterruptedException, ExecutionException { ints= state; //如果没完成就一直等着,直到完成 if (s <= COMPLETING) s = awaitDone(false, 0L); //根据完成后的状态(NORMAL、CANCELLED、INTERRUPTED) //决定返回结果(result or exception) return report(s); }
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) thrownewNullPointerException(); ints= state; //等了确定的时间之后还没完成就抛异常 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) thrownewTimeoutException(); return report(s); }
privateintawaitDone(boolean timed, long nanos) throws InterruptedException { // The code below is very delicate, to achieve these goals: // - call nanoTime exactly once for each call to park // - if nanos <= 0L, return promptly without allocation or nanoTime // - if nanos == Long.MIN_VALUE, don't underflow // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic // and we suffer a spurious wakeup, we will do no worse than // to park-spin for a while longstartTime=0L; // Special value 0L means not yet parked WaitNodeq=null; booleanqueued=false; for (;;) { ints= state; //如果是终态直接返回(终态可以是NORMAL,CANCELLED等) if (s > COMPLETING) { //把当前线程创建的WaitNode中的thread字段置为null //那么这个节点就会被从waiters中清除(具体怎么做的往下看) if (q != null) q.thread = null; return s; } //如果执行完了,正在设置结果,就等一会儿 elseif (s == COMPLETING) // We may have already promised (via isDone) that we are done // so never return empty-handed or throw InterruptedException Thread.yield(); //如果检测到自己被中断,则直接不等了(从阻塞队列中删除),抛出异常 elseif (Thread.interrupted()) { removeWaiter(q); thrownewInterruptedException(); } //如果自己还没入队,而且等待时间没到期,就为自己创建节点 elseif (q == null) { if (timed && nanos <= 0L) return s; q = newWaitNode(); } //这一行逻辑有点多,是先把上面创建的q指向队头,再把队头CAS成q elseif (!queued) queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); //如果调用的是有等待时间的get就走里面的逻辑 elseif (timed) { //算一下要阻塞多久 finallong parkNanos; if (startTime == 0L) { // first time startTime = System.nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos; } else { longelapsed= System.nanoTime() - startTime; if (elapsed >= nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } // nanoTime may be slow; recheck before parking if (state < COMPLETING) LockSupport.parkNanos(this, parkNanos); } //调用park方法,真正阻塞当前线程 else LockSupport.park(this); } }
public V get()throws InterruptedException, ExecutionException { ints= state; //如果没完成就一直等着,直到完成 if (s <= COMPLETING) s = awaitDone(false, 0L); //根据完成后的状态(NORMAL、CANCELLED、INTERRUPTED) //决定返回结果(result or exception) return report(s); }
在完成 了awaitDone之后的最后一步就是report 代码如下:
1 2 3 4 5 6 7 8
private V report(int s)throws ExecutionException { Objectx= outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) thrownewCancellationException(); thrownewExecutionException((Throwable)x); }
这个逻辑就简单明了了,正常就返回结果,取消或中断就抛异常
finishCompletion()
还剩最后一个问题:当FutureTask执行完毕之后,是如何唤醒所有阻塞线程的
还记得在run方法的最后执行了什么吗?
执行的是下面这个set方法,把结果set到outcome里面,从而被get获取到
1 2 3 4 5 6 7
protectedvoidset(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); } }