ReentrantLock之Condition源码分析
ReentrantLock 定义
Condition是JUC里面提供于控制线程释放锁, 然后进行等待其他获取锁的线程发送 signal 信号来进行唤醒的工具类.
主要特点
- Condition内部主要是由一个装载线程节点 Node 的 Condition Queue 实现
- 对 Condition 的方法(await, signal等) 的调用必需是在本线程获取了独占锁的前提下
- 因为操作Condition的方法的前提是获取独占锁, 所以 Condition Queue 内部是一条不支持并发安全的单向 queue (这是相对于 AQS 里面的 Sync Queue)
Demo实例
从如下执行结果来看,线程1先await释放锁,然后线程2获取到锁,接着线程2唤醒等待锁,然后线程2释放锁,最后线程1解锁等待中的锁。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55package com.fly.learn.reentrantlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author: peijiepang
* @date 2018/11/8
* @Description:
*/
public class ConditionTest extends Thread{
private final static Logger LOGGER = LoggerFactory.getLogger(ConditionTest.class);
private ReentrantLock lock = null;
private Condition condition = null;
public ConditionTest(String name,ReentrantLock lock,Condition condition) {
super(name);
this.lock = lock;
this.condition = condition;
}
public void run() {
lock.lock();
try{
LOGGER.info("thread name:{} lock success.",Thread.currentThread().getName());
if(Thread.currentThread().getName().equals("test1")){
condition.await();//释放锁,然后等待唤醒
LOGGER.info("thread name:{} 被唤醒,即将unlock.",Thread.currentThread().getName());
}else if(Thread.currentThread().getName().equals("test2")) {
condition.signal();//唤醒等待线程
LOGGER.info("thread name:{} 唤醒队列中的线程,即将unlock.",Thread.currentThread().getName());
}
}catch (InterruptedException ex){
ex.printStackTrace();
}finally {
lock.unlock();
LOGGER.info("thread name:{} unlock success.",Thread.currentThread().getName());
}
}
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
ConditionTest test1 = new ConditionTest("test1",reentrantLock,condition);
ConditionTest test2 = new ConditionTest("test2",reentrantLock,condition);
test1.start();
test2.start();
}
}1
2
3
4
5
62018-11-08 23:17:50.065 [test1] INFO c.f.l.reentrantlock.ConditionTest - thread name:test1 lock success.
2018-11-08 23:17:50.072 [test2] INFO c.f.l.reentrantlock.ConditionTest - thread name:test2 lock success.
2018-11-08 23:17:50.072 [test2] INFO c.f.l.reentrantlock.ConditionTest - thread name:test2 唤醒队列中的线程,即将unlock.
2018-11-08 23:17:50.072 [test2] INFO c.f.l.reentrantlock.ConditionTest - thread name:test2 unlock success.
2018-11-08 23:17:50.072 [test1] INFO c.f.l.reentrantlock.ConditionTest - thread name:test1 被唤醒,即将unlock.
2018-11-08 23:17:50.072 [test1] INFO c.f.l.reentrantlock.ConditionTest - thread name:test1 unlock success.类图
源码分析
- 构造函数
1
2
3
4
5
6
7
8
9
10
11/** First node of condition queue. */
/** Condition Queue 里面的头节点 */
private transient Node firstWaiter;
/** Last node of condition queue. */
/** Condition Queue 里面的尾节点 */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { } - Condition Queue enqueue节点方法 addConditionWaiter
addConditionWaiter方法主要用于调用 Condition.await 时将当前节点封装成 一个Node, 加入到 Condition Queue里面.
大家可以注意下, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
* Adds a new waiter to wait queue
* 将当前线程封装成一个 Node 节点 放入大 Condition Queue 里面
* 大家可以注意到, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况
* @return
*/
private Node addConditionWaiter() {
Node t = lastWaiter; // 1. Condition queue 的尾节点
// If lastWaiter is cancelled, clean out. // 2.尾节点已经Cancel, 直接进行清除,
// 这里有1个问题, 1 何时出现t.waitStatus != Node.CONDITION -> 在对线程进行中断时 ConditionObject -> await -> checkInterruptWhileWaiting -> transferAfterCancelledWait "compareAndSetWaitStatus(node, Node.CONDITION, 0)" <- 导致这种情况一般是 线程中断或 await 超时
// 一个注意点: 当Condition进行 awiat 超时或被中断时, Condition里面的节点是没有被删除掉的, 需要其他 await 在将线程加入 Condition Queue 时调用addConditionWaiter而进而删除, 或 await 操作差不多结束时, 调用 "node.nextWaiter != null" 进行判断而删除 (PS: 通过 signal 进行唤醒时 node.nextWaiter 会被置空, 而中断和超时时不会)
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); // 3. 调用 unlinkCancelledWaiters 对 "waitStatus != Node.CONDITION" 的节点进行删除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 (signal/timeout/interrupt))
t = lastWaiter; // 4. 获取最新的 lastWaiter
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);// 5. 将线程封装成 node 准备放入 Condition Queue 里面
if (t == null)
firstWaiter = node; // 6 .Condition Queue 是空的
else
t.nextWaiter = node; // 7. 最加到 queue 尾部
lastWaiter = node; // 8. 重新赋值 lastWaiter
return node;
} - Condition 唤醒 first节点方法 doSignal
这里的唤醒指的是将节点从 Condition Queue 转移到 Sync Queue 里面1
2
3
4
5
6
7
8
9
10
11/**
* 唤醒 Condition Queue 里面的头节点, 注意这里的唤醒只是将 Node 从 Condition Queue 转到 Sync Queue 里面(这时的 Node 也还是能被 Interrupt)
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) // 1. 将 first.nextWaiter 赋值给 nextWaiter 为下次做准备
lastWaiter = null; // 2. 这时若 nextWaiter == null, 则说明 Condition 为空了, 所以直接置空 lastWaiter
first.nextWaiter = null;
} while (!transferForSignal(first) && // 3. first.nextWaiter == null 是判断 Node 从 Condition queue 转移到 Sync Queue 里面是通过 signal 还是 timeout/interrupt
(first = firstWaiter) != null); // 4. 调用 transferForSignal将 first 转移到 Sync Queue 里面, 返回不成功的话, 将 firstWaiter 赋值给 first
} - Condition 唤醒 所有 节点方法 doSignalAll
1
2
3
4
5
6
7
8
9
10
11
12/**
* 唤醒 Condition Queue 里面的所有的节点
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null; // 1. 将 lastWaiter, firstWaiter 置空
do {
Node next = first.nextWaiter; // 2. 初始化下个换新的节点
first.nextWaiter = null; // 3. first.nextWaiter == null 是判断 Node 从 Condition queue 转移到 Sync Queue 里面是通过 signal 还是 timeout/interrupt
transferForSignal(first); // 4. 调用 transferForSignal将 first 转移到 Sync Queue 里面
first = next; // 5. 开始换新 next 节点
} while (first != null);
} - Condition 删除取消节点的方法 unlinkCancelledWaiters
一般的节点都会被 signal 唤醒, 从 Condition Queue 转移到 Sync Queue, 而若遇到 interrupt 或 等待超时, 则直接改变 node 的状态(从 CONDITION 变成 0), 并直接放入 Sync 里面, 而不清理Condition Queue 里面的节点, 所以需要下面的函数
毫无疑问, 这是一段非常精巧的queue节点删除, 主要还是在 节点 trail 上, trail 节点可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
* 在 调用 addConditionWaiter 将线程放入 Condition Queue 里面时 或 awiat 方法获取 差不多结束时 进行清理 Condition queue 里面的因 timeout/interrupt 而还存在的节点
* 这个删除操作比较巧妙, 其中引入了 trail 节点, 可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter; // 1. 先初始化 next 节点
if (t.waitStatus != Node.CONDITION) { // 2. 节点不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的)
t.nextWaiter = null; // 3. Node.nextWaiter 置空
if (trail == null) // 4. 一次都没有遇到有效的节点
firstWaiter = next; // 5. 将 next 赋值给 firstWaiter(此时 next 可能也是无效的, 这只是一个临时处理)
else
trail.nextWaiter = next; // 6. next 赋值给 trail.nextWaiter, 这一步其实就是删除节点 t
if (next == null)
lastWaiter = trail; // 7. next == null 说明 已经 traverse 完了 Condition Queue
}
else
trail = t; // 8. 将有效节点赋值给 trail
t = next;
}
} - Condition 唤醒首节点方法 signal
1
2
3
4
5
6
7
8
9
10
11
12/**
* 将 Condition queue 的头节点转移到 Sync Queue 里面
* 在进行调用 signal 时, 当前的线程必须获取了 独占的锁
*/
public final void signal() {
if (!isHeldExclusively()) // 1. 判断当前的线程是否已经获取 独占锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 2. 调用 doSignal 进行转移
} - Condition 唤醒所有节点方法 signalAll
1
2
3
4
5
6
7
8
9
10/**
* 将 Condition Queue 里面的节点都转移到 Sync Queue 里面
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
} - Condition 释放锁进行等待方法 awaitUninterruptibly
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
* 不响应线程中断的方式进行 await
*/
public final void awaitUninterruptibly() {
Node node = addConditionWaiter(); // 1. 将当前线程封装成一个 Node 放入 Condition Queue 里面
int savedState = fullyRelease(node); // 2. 释放当前线程所获取的所有的独占锁(PS: 独占的锁支持重入), 等等, 为什么要释放呢? 以为你调用 awaitUninterruptibly 方法的前提就是你已经获取了 独占锁
boolean interrupted = false; // 3. 线程中断标识
while (!isOnSyncQueue(node)) { // 4. 这里是一个 while loop, 调用 isOnSyncQueue 判断当前的 Node 是否已经被转移到 Sync Queue 里面
LockSupport.park(this); // 5. 若当前 node 不在 sync queue 里面, 则先 block 一下等待其他线程调用 signal 进行唤醒; (这里若有其他线程对当前线程进行 中断的换, 也能进行唤醒)
if (Thread.interrupted()) // 6. 判断这是唤醒是 signal 还是 interrupted(Thread.interrupted()会清楚线程的中断标记, 但没事, 我们有步骤7中的interrupted进行记录)
interrupted = true; // 7. 说明这次唤醒是被中断而唤醒的,这个标记若是true的话, 在 awiat 离开时还要 自己中断一下(selfInterrupt), 其他的函数可能需要线程的中断标识
}
if (acquireQueued(node, savedState) || interrupted) // 8. acquireQueued 返回 true 说明线程在 block 的过程中式被 inetrrupt 过(其实 acquireQueued 返回 true 也有可能其中有一次唤醒是 通过 signal)
selfInterrupt(); // 9. 自我中断, 外面的线程可以通过这个标识知道, 整个 awaitUninterruptibly 运行过程中 是否被中断过
} - Condition 释放锁 进行等待的方法 await
await 此方法响应中断请求, 当接受到中断请求后会将节点从 Condition Queue 转移到 Sync Queue1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22/**
* 支持 InterruptedException 的 await <- 注意这里即使是线程被中断,
* 还是需要获取了独占的锁后, 再 调用 lock.unlock 进行释放锁
*/
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 1. 判断线程是否中断
throw new InterruptedException();
Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 4. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
LockSupport.park(this); // 5. 当前线程没在 Sync Queue 里面, 则进行 block
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)// 6. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)// 7. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled // 8. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
unlinkCancelledWaiters(); // 9. 进行 cancelled 节点的清除
if (interruptMode != 0) // 10. "interruptMode != 0" 代表通过中断的方式唤醒线程
reportInterruptAfterWait(interruptMode); // 11. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
} - Condition 释放锁 进行等待的方法 awaitNanos
awaitNanos 具有超时功能, 与响应中断的功能, 不管中断还是超时都会 将节点从 Condition Queue 转移到 Sync Queue1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35/**
* 所有 awaitXX 方法其实就是
* 0. 将当前的线程封装成 Node 加入到 Condition 里面
* 1. 丢到当前线程所拥有的 独占锁,
* 2. 等待 其他获取 独占锁的线程的唤醒, 唤醒从 Condition Queue 到 Sync Queue 里面, 进而获取 独占锁
* 3. 最后获取 lock 之后, 在根据线程唤醒的方式(signal/interrupt) 进行处理
* 4. 最后还是需要调用 lock./unlock 进行释放锁
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted()) // 1. 判断线程是否中断
throw new InterruptedException();
Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
final long deadline = System.nanoTime() + nanosTimeout; // 4. 计算 wait 的截止时间
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 5. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
if (nanosTimeout <= 0L) { // 6. 等待时间超时(这里的 nanosTimeout 是有可能 < 0),
transferAfterCancelledWait(node); // 7. 调用 transferAfterCancelledWait 将 Node 从 Condition 转移到 Sync Queue 里面
break;
}
if (nanosTimeout >= spinForTimeoutThreshold) // 8. 当剩余时间 < spinForTimeoutThreshold, 其实函数 spin 比用 LockSupport.parkNanos 更高效
LockSupport.parkNanos(this, nanosTimeout);// 9. 进行线程的 block
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)// 10. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
break; // 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面
nanosTimeout = deadline - System.nanoTime(); // 11. 计算剩余时间
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)// 12. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // 13. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
unlinkCancelledWaiters(); // 14. 进行 cancelled 节点的清除
if (interruptMode != 0) // 15. "interruptMode != 0" 代表通过中断的方式唤醒线程
reportInterruptAfterWait(interruptMode); // 16. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
return deadline - System.nanoTime(); // 17 这个返回值代表是 通过 signal 还是 超时
} - Condition 释放锁 进行等待的方法 awaitUntil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public boolean awaitUntil(Date deadline) throws InterruptedException {
long abstime = deadline.getTime(); // 1. 判断线程是否中断
if(Thread.interrupted()){
throw new InterruptedException();
}
Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
boolean timeout = false;
int interruptMode = 0;
while(!isOnSyncQueue(node)){ // 4. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
if(System.currentTimeMillis() > abstime){ // 5. 计算是否超时
timeout = transferAfterCancelledWait(node); // 6. 调用 transferAfterCancelledWait 将 Node 从 Condition 转移到 Sync Queue 里面
break;
}
LockSupport.parkUntil(this, abstime); // 7. 进行 线程的阻塞
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 8. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
break; // 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面
}
}
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 9. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){ // 10. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
unlinkCancelledWaiters(); // 11. 进行 cancelled 节点的清除
}
if(interruptMode != 0){ // 12. "interruptMode != 0" 代表通过中断的方式唤醒线程
reportInterruptAfterWait(interruptMode); // 13. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
}
return !timeout; // 13. 返回是否通过 interrupt 进行线程的唤醒
}总结
Condition主要是为了在J.U.C框架中提供和Java传统的监视器风格的wait,notify和notifyAll方法类似的功能。