ReentrantLock之Condition源码分析

ReentrantLock 定义

Condition是JUC里面提供于控制线程释放锁, 然后进行等待其他获取锁的线程发送 signal 信号来进行唤醒的工具类.

主要特点

  • Condition内部主要是由一个装载线程节点 Node 的 Condition Queue 实现
  • 对 Condition 的方法(await, signal等) 的调用必需是在本线程获取了独占锁的前提下
  • 因为操作Condition的方法的前提是获取独占锁, 所以 Condition Queue 内部是一条不支持并发安全的单向 queue (这是相对于 AQS 里面的 Sync Queue)

    Demo实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    package com.fly.learn.reentrantlock;

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;

    /**
    * @author: peijiepang
    * @date 2018/11/8
    * @Description:
    */
    public class ConditionTest extends Thread{

    private final static Logger LOGGER = LoggerFactory.getLogger(ConditionTest.class);

    private ReentrantLock lock = null;
    private Condition condition = null;

    public ConditionTest(String name,ReentrantLock lock,Condition condition) {
    super(name);
    this.lock = lock;
    this.condition = condition;
    }

    @Override
    public void run() {
    lock.lock();
    try{
    LOGGER.info("thread name:{} lock success.",Thread.currentThread().getName());
    if(Thread.currentThread().getName().equals("test1")){
    condition.await();//释放锁,然后等待唤醒
    LOGGER.info("thread name:{} 被唤醒,即将unlock.",Thread.currentThread().getName());
    }else if(Thread.currentThread().getName().equals("test2")) {
    condition.signal();//唤醒等待线程
    LOGGER.info("thread name:{} 唤醒队列中的线程,即将unlock.",Thread.currentThread().getName());
    }
    }catch (InterruptedException ex){
    ex.printStackTrace();
    }finally {
    lock.unlock();
    LOGGER.info("thread name:{} unlock success.",Thread.currentThread().getName());
    }
    }

    public static void main(String[] args) {
    ReentrantLock reentrantLock = new ReentrantLock();
    Condition condition = reentrantLock.newCondition();
    ConditionTest test1 = new ConditionTest("test1",reentrantLock,condition);
    ConditionTest test2 = new ConditionTest("test2",reentrantLock,condition);
    test1.start();
    test2.start();
    }
    }
    从如下执行结果来看,线程1先await释放锁,然后线程2获取到锁,接着线程2唤醒等待锁,然后线程2释放锁,最后线程1解锁等待中的锁。
    1
    2
    3
    4
    5
    6
    2018-11-08 23:17:50.065 [test1] INFO  c.f.l.reentrantlock.ConditionTest - thread name:test1 lock success.
    2018-11-08 23:17:50.072 [test2] INFO c.f.l.reentrantlock.ConditionTest - thread name:test2 lock success.
    2018-11-08 23:17:50.072 [test2] INFO c.f.l.reentrantlock.ConditionTest - thread name:test2 唤醒队列中的线程,即将unlock.
    2018-11-08 23:17:50.072 [test2] INFO c.f.l.reentrantlock.ConditionTest - thread name:test2 unlock success.
    2018-11-08 23:17:50.072 [test1] INFO c.f.l.reentrantlock.ConditionTest - thread name:test1 被唤醒,即将unlock.
    2018-11-08 23:17:50.072 [test1] INFO c.f.l.reentrantlock.ConditionTest - thread name:test1 unlock success.

    类图

源码分析

  1. 构造函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /** First node of condition queue. */
    /** Condition Queue 里面的头节点 */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    /** Condition Queue 里面的尾节点 */
    private transient Node lastWaiter;

    /**
    * Creates a new {@code ConditionObject} instance.
    */
    public ConditionObject() { }
  2. Condition Queue enqueue节点方法 addConditionWaiter
    addConditionWaiter方法主要用于调用 Condition.await 时将当前节点封装成 一个Node, 加入到 Condition Queue里面.
    大家可以注意下, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * Adds a new waiter to wait queue
    * 将当前线程封装成一个 Node 节点 放入大 Condition Queue 里面
    * 大家可以注意到, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况
    * @return
    */
    private Node addConditionWaiter() {
    Node t = lastWaiter; // 1. Condition queue 的尾节点
    // If lastWaiter is cancelled, clean out. // 2.尾节点已经Cancel, 直接进行清除,
    // 这里有1个问题, 1 何时出现t.waitStatus != Node.CONDITION -> 在对线程进行中断时 ConditionObject -> await -> checkInterruptWhileWaiting -> transferAfterCancelledWait "compareAndSetWaitStatus(node, Node.CONDITION, 0)" <- 导致这种情况一般是 线程中断或 await 超时
    // 一个注意点: 当Condition进行 awiat 超时或被中断时, Condition里面的节点是没有被删除掉的, 需要其他 await 在将线程加入 Condition Queue 时调用addConditionWaiter而进而删除, 或 await 操作差不多结束时, 调用 "node.nextWaiter != null" 进行判断而删除 (PS: 通过 signal 进行唤醒时 node.nextWaiter 会被置空, 而中断和超时时不会)
    if (t != null && t.waitStatus != Node.CONDITION) {
    unlinkCancelledWaiters(); // 3. 调用 unlinkCancelledWaiters 对 "waitStatus != Node.CONDITION" 的节点进行删除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 (signal/timeout/interrupt))
    t = lastWaiter; // 4. 获取最新的 lastWaiter
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);// 5. 将线程封装成 node 准备放入 Condition Queue 里面
    if (t == null)
    firstWaiter = node; // 6 .Condition Queue 是空的
    else
    t.nextWaiter = node; // 7. 最加到 queue 尾部
    lastWaiter = node; // 8. 重新赋值 lastWaiter
    return node;
    }
  3. Condition 唤醒 first节点方法 doSignal
    这里的唤醒指的是将节点从 Condition Queue 转移到 Sync Queue 里面
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /**
    * 唤醒 Condition Queue 里面的头节点, 注意这里的唤醒只是将 Node 从 Condition Queue 转到 Sync Queue 里面(这时的 Node 也还是能被 Interrupt)
    */
    private void doSignal(Node first) {
    do {
    if ( (firstWaiter = first.nextWaiter) == null) // 1. 将 first.nextWaiter 赋值给 nextWaiter 为下次做准备
    lastWaiter = null; // 2. 这时若 nextWaiter == null, 则说明 Condition 为空了, 所以直接置空 lastWaiter
    first.nextWaiter = null;
    } while (!transferForSignal(first) && // 3. first.nextWaiter == null 是判断 Node 从 Condition queue 转移到 Sync Queue 里面是通过 signal 还是 timeout/interrupt
    (first = firstWaiter) != null); // 4. 调用 transferForSignal将 first 转移到 Sync Queue 里面, 返回不成功的话, 将 firstWaiter 赋值给 first
    }
  4. Condition 唤醒 所有 节点方法 doSignalAll
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**
    * 唤醒 Condition Queue 里面的所有的节点
    */
    private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null; // 1. 将 lastWaiter, firstWaiter 置空
    do {
    Node next = first.nextWaiter; // 2. 初始化下个换新的节点
    first.nextWaiter = null; // 3. first.nextWaiter == null 是判断 Node 从 Condition queue 转移到 Sync Queue 里面是通过 signal 还是 timeout/interrupt
    transferForSignal(first); // 4. 调用 transferForSignal将 first 转移到 Sync Queue 里面
    first = next; // 5. 开始换新 next 节点
    } while (first != null);
    }
  5. Condition 删除取消节点的方法 unlinkCancelledWaiters
    一般的节点都会被 signal 唤醒, 从 Condition Queue 转移到 Sync Queue, 而若遇到 interrupt 或 等待超时, 则直接改变 node 的状态(从 CONDITION 变成 0), 并直接放入 Sync 里面, 而不清理Condition Queue 里面的节点, 所以需要下面的函数
    毫无疑问, 这是一段非常精巧的queue节点删除, 主要还是在 节点 trail 上, trail 节点可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * 在 调用 addConditionWaiter 将线程放入 Condition Queue 里面时 或 awiat 方法获取 差不多结束时 进行清理 Condition queue 里面的因 timeout/interrupt 而还存在的节点
    * 这个删除操作比较巧妙, 其中引入了 trail 节点, 可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点
    */
    private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
    Node next = t.nextWaiter; // 1. 先初始化 next 节点
    if (t.waitStatus != Node.CONDITION) { // 2. 节点不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的)
    t.nextWaiter = null; // 3. Node.nextWaiter 置空
    if (trail == null) // 4. 一次都没有遇到有效的节点
    firstWaiter = next; // 5. 将 next 赋值给 firstWaiter(此时 next 可能也是无效的, 这只是一个临时处理)
    else
    trail.nextWaiter = next; // 6. next 赋值给 trail.nextWaiter, 这一步其实就是删除节点 t
    if (next == null)
    lastWaiter = trail; // 7. next == null 说明 已经 traverse 完了 Condition Queue
    }
    else
    trail = t; // 8. 将有效节点赋值给 trail
    t = next;
    }
    }
  6. Condition 唤醒首节点方法 signal
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**
    * 将 Condition queue 的头节点转移到 Sync Queue 里面
    * 在进行调用 signal 时, 当前的线程必须获取了 独占的锁
    */
    public final void signal() {
    if (!isHeldExclusively()) // 1. 判断当前的线程是否已经获取 独占锁
    throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
    doSignal(first); // 2. 调用 doSignal 进行转移
    }

  7. Condition 唤醒所有节点方法 signalAll
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * 将 Condition Queue 里面的节点都转移到 Sync Queue 里面
    */
    public final void signalAll() {
    if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
    doSignalAll(first);
    }
  8. Condition 释放锁进行等待方法 awaitUninterruptibly
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /**
    * 不响应线程中断的方式进行 await
    */
    public final void awaitUninterruptibly() {
    Node node = addConditionWaiter(); // 1. 将当前线程封装成一个 Node 放入 Condition Queue 里面
    int savedState = fullyRelease(node); // 2. 释放当前线程所获取的所有的独占锁(PS: 独占的锁支持重入), 等等, 为什么要释放呢? 以为你调用 awaitUninterruptibly 方法的前提就是你已经获取了 独占锁
    boolean interrupted = false; // 3. 线程中断标识
    while (!isOnSyncQueue(node)) { // 4. 这里是一个 while loop, 调用 isOnSyncQueue 判断当前的 Node 是否已经被转移到 Sync Queue 里面
    LockSupport.park(this); // 5. 若当前 node 不在 sync queue 里面, 则先 block 一下等待其他线程调用 signal 进行唤醒; (这里若有其他线程对当前线程进行 中断的换, 也能进行唤醒)
    if (Thread.interrupted()) // 6. 判断这是唤醒是 signal 还是 interrupted(Thread.interrupted()会清楚线程的中断标记, 但没事, 我们有步骤7中的interrupted进行记录)
    interrupted = true; // 7. 说明这次唤醒是被中断而唤醒的,这个标记若是true的话, 在 awiat 离开时还要 自己中断一下(selfInterrupt), 其他的函数可能需要线程的中断标识
    }
    if (acquireQueued(node, savedState) || interrupted) // 8. acquireQueued 返回 true 说明线程在 block 的过程中式被 inetrrupt 过(其实 acquireQueued 返回 true 也有可能其中有一次唤醒是 通过 signal)
    selfInterrupt(); // 9. 自我中断, 外面的线程可以通过这个标识知道, 整个 awaitUninterruptibly 运行过程中 是否被中断过
    }
  9. Condition 释放锁 进行等待的方法 await
    await 此方法响应中断请求, 当接受到中断请求后会将节点从 Condition Queue 转移到 Sync Queue
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    /**
    * 支持 InterruptedException 的 await <- 注意这里即使是线程被中断,
    * 还是需要获取了独占的锁后, 再 调用 lock.unlock 进行释放锁
    */
    public final void await() throws InterruptedException {
    if (Thread.interrupted()) // 1. 判断线程是否中断
    throw new InterruptedException();
    Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
    int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // 4. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
    LockSupport.park(this); // 5. 当前线程没在 Sync Queue 里面, 则进行 block
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)// 6. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
    break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)// 7. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过
    interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled // 8. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
    unlinkCancelledWaiters(); // 9. 进行 cancelled 节点的清除
    if (interruptMode != 0) // 10. "interruptMode != 0" 代表通过中断的方式唤醒线程
    reportInterruptAfterWait(interruptMode); // 11. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
    }
  10. Condition 释放锁 进行等待的方法 awaitNanos
    awaitNanos 具有超时功能, 与响应中断的功能, 不管中断还是超时都会 将节点从 Condition Queue 转移到 Sync Queue
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    /**
    * 所有 awaitXX 方法其实就是
    * 0. 将当前的线程封装成 Node 加入到 Condition 里面
    * 1. 丢到当前线程所拥有的 独占锁,
    * 2. 等待 其他获取 独占锁的线程的唤醒, 唤醒从 Condition Queue 到 Sync Queue 里面, 进而获取 独占锁
    * 3. 最后获取 lock 之后, 在根据线程唤醒的方式(signal/interrupt) 进行处理
    * 4. 最后还是需要调用 lock./unlock 进行释放锁
    */
    public final long awaitNanos(long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted()) // 1. 判断线程是否中断
    throw new InterruptedException();
    Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
    int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
    final long deadline = System.nanoTime() + nanosTimeout; // 4. 计算 wait 的截止时间
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // 5. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
    if (nanosTimeout <= 0L) { // 6. 等待时间超时(这里的 nanosTimeout 是有可能 < 0),
    transferAfterCancelledWait(node); // 7. 调用 transferAfterCancelledWait 将 Node 从 Condition 转移到 Sync Queue 里面
    break;
    }
    if (nanosTimeout >= spinForTimeoutThreshold) // 8. 当剩余时间 < spinForTimeoutThreshold, 其实函数 spin 比用 LockSupport.parkNanos 更高效
    LockSupport.parkNanos(this, nanosTimeout);// 9. 进行线程的 block
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)// 10. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
    break; // 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面
    nanosTimeout = deadline - System.nanoTime(); // 11. 计算剩余时间
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)// 12. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过
    interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // 13. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
    unlinkCancelledWaiters(); // 14. 进行 cancelled 节点的清除
    if (interruptMode != 0) // 15. "interruptMode != 0" 代表通过中断的方式唤醒线程
    reportInterruptAfterWait(interruptMode); // 16. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
    return deadline - System.nanoTime(); // 17 这个返回值代表是 通过 signal 还是 超时
    }
  11. Condition 释放锁 进行等待的方法 awaitUntil
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    @Override
    public boolean awaitUntil(Date deadline) throws InterruptedException {
    long abstime = deadline.getTime(); // 1. 判断线程是否中断
    if(Thread.interrupted()){
    throw new InterruptedException();
    }
    Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
    int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
    boolean timeout = false;
    int interruptMode = 0;
    while(!isOnSyncQueue(node)){ // 4. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
    if(System.currentTimeMillis() > abstime){ // 5. 计算是否超时
    timeout = transferAfterCancelledWait(node); // 6. 调用 transferAfterCancelledWait 将 Node 从 Condition 转移到 Sync Queue 里面
    break;
    }
    LockSupport.parkUntil(this, abstime); // 7. 进行 线程的阻塞
    if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 8. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
    break; // 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面
    }
    }

    if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 9. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过
    interruptMode = REINTERRUPT;
    }
    if(node.nextWaiter != null){ // 10. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
    unlinkCancelledWaiters(); // 11. 进行 cancelled 节点的清除
    }
    if(interruptMode != 0){ // 12. "interruptMode != 0" 代表通过中断的方式唤醒线程
    reportInterruptAfterWait(interruptMode); // 13. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
    }

    return !timeout; // 13. 返回是否通过 interrupt 进行线程的唤醒
    }

    总结

    Condition主要是为了在J.U.C框架中提供和Java传统的监视器风格的wait,notify和notifyAll方法类似的功能。