并发包Callable、Future和FutureTask源码分析

简介

线程任务执行如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。

而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

今天我们就来讨论一下Callable、Future和FutureTask三个类的使用方法。

Callable与Runnable

先说一下java.lang.Runnable吧,它是一个接口,在它里面只声明了一个run()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@FunctionalInterface
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}

由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call():

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

那么怎么使用Callable呢?一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future类位于java.util.concurrent包下,它是一个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public interface Future<V> {

/**
* cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。
* 参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,
* 如果设置true,则表示可以取消正在执行过程中的任务。
* 如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,
* 即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,
* 则返回true,若mayInterruptIfRunning设置为false,则返回false;
* 如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
* @param mayInterruptIfRunning
* @return
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* 表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
* @return
*/
boolean isCancelled();

/**
* 表示任务是否已经完成,若任务完成,则返回true;
* @return
*/
boolean isDone();

/**
* 用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
V get() throws InterruptedException, ExecutionException;

/**
* 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
*
* @param timeout
* @param unit
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

RunnableFuture

RunnableFuture作为 Runnable 的 Future。成功执行 run 方法可以完成 Future 并允许访问其结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 作为 Runnable 的 Future。成功执行 run 方法可以完成 Future 并允许访问其结果。
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

FutureTask

我们先来看一下FutureTask的实现:

1
public class FutureTask<V> implements RunnableFuture<V>

FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:

1
2
3
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
FutureTask提供了2个构造器:

1
2
3
4
public FutureTask(Callable<V> callable) {
}
public FutureTask(Runnable runnable, V result) {
}

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.fly.learn.aqs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;

/**
* 测试future
* @author: peijiepang
* @date 2018-12-11
* @Description:
*/
public class FuthreTest {
private final static Logger LOGGER = LoggerFactory.getLogger(FuthreTest.class);
private static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(100),new ThreadPoolExecutor.DiscardPolicy());

public static void main(String[] args) throws Exception {
Future futureOne = executorService.submit(new Runnable() {
@Override
public void run() {
LOGGER.info("start runable one");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Future futureTwo = executorService.submit(new Runnable() {
@Override
public void run() {
LOGGER.info("start runable two");
}
});

Future futureThree=null;
try {
futureThree = executorService.submit(new Runnable() {
@Override
public void run() {
LOGGER.info("start runable three");
}
});
} catch (Exception e) {
LOGGER.error(e.getLocalizedMessage());
}

Future<Integer> futureFour = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(5000L);
return 1;
}
});

FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(5000L);
return "ok";
}
});
executorService.submit(futureTask);

LOGGER.info("task one " + futureOne.get());
LOGGER.info("task two " + futureTwo.get());
LOGGER.info("task three " + (futureThree==null?null:futureThree.get()));
LOGGER.info("task fore "+futureFour.get());
LOGGER.info("futureTask "+futureTask.get());
executorService.shutdown();
}
}

FutureTask源码分析

任务提交

在使用ThreadPoolExecutor使用submit提交任务后然后交给线程池中的线程去执行,是把在ThreadPoolExecutor(其实是在AbstractExecutorService中)有如下几个submit方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}

/**
* Returns a {@code RunnableFuture} for the given callable task.
*
* @param callable the callable task being wrapped
* @param <T> the type of the callable's result
* @return a {@code RunnableFuture} which, when run, will call the
* underlying callable and which, as a {@code Future}, will yield
* the callable's result as its result and provide for
* cancellation of the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

/**
* Returns a {@code RunnableFuture} for the given runnable and default
* value.
*
* @param runnable the runnable task being wrapped
* @param value the default value for the returned future
* @param <T> the type of the given value
* @return a {@code RunnableFuture} which, when run, will run the
* underlying runnable and which, as a {@code Future}, will yield
* the given value as its result and provide for cancellation of
* the underlying task
* @since 1.6
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* 任务状态
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;//任务执行阶段,结果赋值前
private static final int COMPLETING = 1;//结果赋值阶段
private static final int NORMAL = 2;//任务执行完毕
private static final int EXCEPTIONAL = 3;//任务执行时发生异常
private static final int CANCELLED = 4;//任务被取消
private static final int INTERRUPTING = 5;//设置中断变量阶段
private static final int INTERRUPTED = 6;//任务中断

/** The underlying callable; nulled out after running */
private Callable<V> callable;
//任务返回值,正常返回时是泛型指定对象,任务异常时是Throwable对象,它在state=COMPLETING阶段完成赋值操作
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
//当前执行任务线程,run()方法开始时进行判断和赋值,保证同一时刻只有一个线程执行FutureTask,并且FutureTask.run()只能执行一次。
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
//阻塞队列头节点,每个节点存储调用FutureTask.get()方法,且采用LockSupport.park()阻塞的线程。在任务对outcome完成赋值后,调用finishCompletion()唤醒所有阻塞线程。
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

/**
* 将等待线程封装成节点,形成等待队列
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

//UNSAFE类提供高效并且线程安全方式操作变量,直接和内存数据打交道,
// 但是分配的内存需要手动释放。由于UNSAFE类只提供给JVM信任的启动类加载器所使用,这里采用反射获取UNSAFE对象。
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
//state参数在内存中对象的偏移量
private static final long stateOffset;
//runner参数在内存中对象的偏移量
private static final long runnerOffset;
//waiter参数在内存中对象的偏移量
private static final long waitersOffset;
static {
try {
//通过反射的方式来访问Unsafe类中的theUnsafe静态成员变量,该theUnsafe静态成员变量在Unsafe第一次使用时就已经初始化。
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}

isDown方法

1
2
3
4
5
6
7
/**
* 因为NEW表示任务执行阶段,因此判断任务是否完成
* @return
*/
public boolean isDone() {
return state != NEW;
}

cancel方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 如果任务在NEW阶段,mayInterruptIfRunning == true,且能CAS能修改state值,中断当前线程,调用finishCompletion()
* @param mayInterruptIfRunning
* @return
*/
public boolean cancel(boolean mayInterruptIfRunning) {
//判断当前任务状态是否为NEW,并且cas操作能否成功修改state值,如果不能,返回false
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
// 修改state为INTERRUPTED状态
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//唤醒所有调用get方法等待线程队列
finishCompletion();
}
return true;
}

isCancelled方法

1
2
3
4
5
6
7
/**
* 其实就是指线程中断中和中断状态
* @return
*/
public boolean isCancelled() {
return state >= CANCELLED;
}

主流程get()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/**
* 如果任务正在执行或者正在赋值,调用awaitDone()方法,阻塞当前线程,且封装成WaitNode,放入等待队列中
* 否则,调用report(),将outcome包装成返回参数类型。
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

/**
* 功能:制定线程等待结果的具体实行策略
*
* 逻辑
* for中判断当前线程是否被中断,如果是抛出异常
*
* 一、如果state == NEW(运行状态)
* 将当前线程包装成WaitNode,并CAS放入队列头部。(期间如果状态改变,可能其中某些步骤未进行)
* 如果timed == true,计算超时时间,已经超过设置时间,移除队列节点,返回
* 未超过设置时间,阻塞线程到设置时间
* 这几个操作是层级关系,如果一直保持state == NEW,将进行:
* 创建WaitNode
* CAS放入队列头部
* 超时移除节点并返回
* 未超时调用parkNanos()阻塞线程
* 二、如果state == COMPLETING(赋值阶段)
* 当前线程yield(),等待状态更新
*
* 三、如果state > COMPLETING
* 将等待队列中该线程节点的thread参数设置为null
*
* System.nanoTime :返回的可能是任意时间,甚至为负,相对于System.concurrentTime更加精确。
* 用于记录一个时间段。
*
* 返回结果:state
* state <= COMPLETING,表示获取结果失败,可能是超时,可能是执行错误
* state > COMPLETING,表示获取结果成功
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
//判断当前线程是否被中断,中断将移除等待队列中的
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
//在队列头部添加q,并将q赋值给waiters
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//将线程阻塞,设置阻塞时间为nanos
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

/**
* 功能:返回任务结果
* 描述: 由于在set(),setException() 方法中设置了otcome的值,
* 可能是throwable对象,也可能是正常返回结果,所以需要
* 对outcome进行一次处理
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

/**
* 定时获取结果
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if(unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 判断:状态是否为NEW,并CAS成功将runner线程赋值为当前线程
* 线程调用run(),进行判断,运行callable.call(),再调用set()将结果设置到outcome。
* 失败调用setException(),将Throwable对象设置到outcome。
*/
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

/**
* 设置返回结果
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

/**
* 通过waiters参数,唤醒等待队列,所有因为调用get()方法而阻塞的线程 LockSupport.unpark():线程取消阻塞
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null; // to reduce footprint
}

/**
* Protected method invoked when this task transitions to state
* {@code isDone} (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
* implementation of this method to determine whether this task
* has been cancelled.
*/
protected void done() { }

总结

FutureTask内部维护一个任务状态,任何操作都是围绕着这个状态进行,并随时更新任务状态。任务发起者调用get()获取执行结果的时候,如果任务还没有执行完毕,则会把自己放入阻塞队列中然后进行阻塞等待。当任务执行完成之后,任务执行线程会依次唤醒阻塞等待的线程。调用cancel()取消任务的时候也只是简单的修改任务状态,如果需要中断任务执行线程的话则调用Thread.interrupt()中断任务执行线程。