Java线程同步组件CyclicBarrier分析

简介

与 CountDownLatch 的实现方式不同,CyclicBarrier 并没有直接通过 AQS 实现同步功能,而是在重入锁 ReentrantLock 的基础上实现的。在 CyclicBarrier 中,线程访问 await 方法需先获取锁才能访问。在最后一个线程访问 await 方法前,其他线程进入 await 方法中后,会调用 Condition 的 await 方法进入等待状态。在最后一个线程进入 CyclicBarrier await 方法后,该线程将会调用 Condition 的 signalAll 方法唤醒所有处于等待状态中的线程。同时,最后一个进入 await 的线程还会重置 CyclicBarrier 的状态,使其可以重复使用。

在创建 CyclicBarrier 对象时,需要转入一个值,用于初始化 CyclicBarrier 的成员变量 parties,该成员变量表示屏障拦截的线程数。当到达屏障的线程数小于 parties 时,这些线程都会被阻塞住。当最后一个线程到达屏障后,此前被阻塞的线程才会被唤醒。

Demo例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.fly.learn.reentrantlock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* @author: peijiepang
* @date 2018/11/14
* @Description:
*/
public class CyclicBarrierTest {
private final static Logger LOGGER = LoggerFactory.getLogger(CyclicBarrierTest.class);

public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier cb = new CyclicBarrier(3, new Thread("barrierAction") {
public void run() {
LOGGER.info(Thread.currentThread().getName() + " barrier action");
}
});
MyThread t1 = new MyThread("t1", cb);
MyThread t2 = new MyThread("t2", cb);
t1.start();
t2.start();
LOGGER.info(Thread.currentThread().getName() + " going to await");
cb.await();
LOGGER.info(Thread.currentThread().getName() + " continue");
}

}

class MyThread extends Thread {
private final static Logger LOGGER = LoggerFactory.getLogger(MyThread.class);

private CyclicBarrier cb;
public MyThread(String name, CyclicBarrier cb) {
super(name);
this.cb = cb;
}

public void run() {
LOGGER.info(Thread.currentThread().getName() + " going to await");
try {
cb.await();
LOGGER.info(Thread.currentThread().getName() + " continue");
} catch (Exception e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
2018-11-14 23:14:23.641 [main] INFO  c.f.l.r.CyclicBarrierTest - main going to await
2018-11-14 23:14:23.641 [t2] INFO com.fly.learn.reentrantlock.MyThread - t2 going to await
2018-11-14 23:14:23.640 [t1] INFO com.fly.learn.reentrantlock.MyThread - t1 going to await
2018-11-14 23:14:23.648 [t1] INFO c.f.l.r.CyclicBarrierTest - t1 barrier action
2018-11-14 23:14:23.648 [t1] INFO com.fly.learn.reentrantlock.MyThread - t1 continue
2018-11-14 23:14:23.649 [t2] INFO com.fly.learn.reentrantlock.MyThread - t2 continue
2018-11-14 23:14:23.650 [main] INFO c.f.l.r.CyclicBarrierTest - main continue

源码分析

  1. 类图
    CyclicBarrier 是基于重入锁 ReentrantLock 实现相关逻辑的。所以要弄懂 CyclicBarrier 的源码,仅需有 ReentrantLock 相关的背景知识即可。关于重入锁 ReentrantLock 方面的知识,有兴趣的朋友可以参考我之前写的文章 Java 重入锁 ReentrantLock原理分析。下面看一下 CyclicBarrier 的代码结构吧,如下:
  1. 构造函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    /**
    * Each use of the barrier is represented as a generation instance.
    * The generation changes whenever the barrier is tripped, or
    * is reset. There can be many generations associated with threads
    * using the barrier - due to the non-deterministic way the lock
    * may be allocated to waiting threads - but only one of these
    * can be active at a time (the one to which {@code count} applies)
    * and all the rest are either broken or tripped.
    * There need not be an active generation if there has been a break
    * but no subsequent reset.
    */
    private static class Generation {
    // 用于记录屏障有没有被破坏
    boolean broken = false;
    }

    /** The lock for guarding barrier entry */
    private final ReentrantLock lock = new ReentrantLock();
    /** Condition to wait on until tripped */
    private final Condition trip = lock.newCondition();
    /** The number of parties */
    private final int parties;//线程数,即当 parties 个线程到达屏障后,屏障才会放行
    /* The command to run when tripped */
    private final Runnable barrierCommand;//回调对象,如果不为 null,会在第 parties 个线程到达屏障后被执行
    /** The current generation */
    /**
    * CyclicBarrier 是可循环使用的屏障,这里使用 Generation 记录当前轮次 CyclicBarrier
    * 的运行状态。当所有线程到达屏障后,generation 将会被更新,表示 CyclicBarrier 进入新一
    * 轮的运行轮次中。
    */
    private Generation generation = new Generation();

    /**
    * Number of parties still waiting. Counts down from parties to 0
    * on each generation. It is reset to parties on each new
    * generation or when broken.
    */
    private int count;//计数器,当 count > 0 时,到达屏障的线程会进入等待状态。当最后一个线程到达屏障后,count 自减至0。最后一个到达的线程会执行回调方法,并唤醒其他处于等待状态中的线程。


    /**
    * 创建一个允许 parties 个线程通行的屏障
    * @param parties
    */
    public CyclicBarrier(int parties) {
    this(parties, null);
    }

    /**
    * 创建一个允许 parties 个线程通行的屏障,若 barrierAction 回调对象不为 null,
    * 则在最后一个线程到达屏障后,执行相应的回调逻辑
    */
    public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
    }
  2. await分析
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    public int await() throws InterruptedException, BrokenBarrierException {
    try {
    return dowait(false, 0L);
    } catch (TimeoutException toe) {
    throw new Error(toe); // cannot happen
    }
    }

    private final ReentrantLock lock = new ReentrantLock();

    /**
    * Main barrier code, covering the various policies.
    */
    private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
    TimeoutException {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
    final Generation g = generation;
    // 如果 g.broken = true,表明屏障被破坏了,这里直接抛出异常
    if (g.broken)
    throw new BrokenBarrierException();

    if (Thread.interrupted()) {
    // 如果线程中断,则调用 breakBarrier 破坏屏障
    breakBarrier();
    throw new InterruptedException();
    }

    /*
    * index 表示线程到达屏障的顺序,index = parties - 1 表明当前线程是第一个
    * 到达屏障的。index = 0,表明当前线程是最有一个到达屏障的。
    */
    int index = --count;
    if (index == 0) { // tripped
    // 当 index = 0 时,唤醒所有处于等待状态的线程
    boolean ranAction = false;
    try {
    final Runnable command = barrierCommand;
    // 如果回调对象不为 null,则执行回调
    if (command != null)
    command.run();
    ranAction = true;
    // 重置屏障状态,使其进入新一轮的运行过程中
    nextGeneration();
    return 0;
    } finally {
    // 若执行回调的过程中发生异常,此时调用 breakBarrier 破坏屏障
    if (!ranAction)
    breakBarrier();
    }
    }

    // loop until tripped, broken, interrupted, or timed out
    //线程运行到此处的线程都会被屏障挡住,并进入等待状态。
    for (;;) {
    try {
    if (!timed)
    trip.await();
    else if (nanos > 0L)
    nanos = trip.awaitNanos(nanos);
    } catch (InterruptedException ie) {
    /*
    * 若下面的条件成立,则表明本轮运行还未结束。此时调用 breakBarrier
    * 破坏屏障,唤醒其他线程,并抛出异常
    */
    if (g == generation && ! g.broken) {
    breakBarrier();
    throw ie;
    } else {
    // We're about to finish waiting even if we had not
    // been interrupted, so this interrupt is deemed to
    // "belong" to subsequent execution.
    /*
    * 若上面的条件不成立,则有两种可能:
    * 1. g != generation
    * 此种情况下,表明循环屏障的第 g 轮次的运行已经结束,屏障已经
    * 进入了新的一轮运行轮次中。当前线程在稍后返回 到达屏障 的顺序即可
    *
    * 2. g = generation 但 g.broken = true
    * 此种情况下,表明已经有线程执行过 breakBarrier 方法了,当前
    * 线程则会在稍后抛出 BrokenBarrierException
    */
    Thread.currentThread().interrupt();
    }
    }

    // 屏障被破坏,则抛出 BrokenBarrierException 异常
    if (g.broken)
    throw new BrokenBarrierException();

    // 屏障进入新的运行轮次,此时返回线程在上一轮次到达屏障的顺序
    if (g != generation)
    return index;

    // 超时判断
    if (timed && nanos <= 0L) {
    breakBarrier();
    throw new TimeoutException();
    }
    }
    } finally {
    lock.unlock();
    }
    }

    /**
    * 开启新的一轮运行过程
    */
    private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();// 唤醒所有处于等待状态中的线程
    // set up next generation
    count = parties;// 重置 count
    generation = new Generation();// 重新创建 Generation,表明进入循环屏障进入新的一轮运行轮次中
    }

    /**
    * 破坏屏障
    */
    private void breakBarrier() {
    generation.broken = true;// 设置屏障是否被破坏标志
    count = parties;// 重置 count
    trip.signalAll();// 唤醒所有处于等待状态中的线程
    }

    //------------------------AQS----------------------//
    public final void signalAll() {
    if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
    throw new IllegalMonitorStateException();
    // 保存condition队列头结点
    Node first = firstWaiter;
    if (first != null) // 头结点不为空
    // 唤醒所有等待线程
    doSignalAll(first);
    }

    private void doSignalAll(Node first) {
    // condition队列的头结点尾结点都设置为空
    lastWaiter = firstWaiter = null;
    // 循环
    do {
    // 获取first结点的nextWaiter域结点
    Node next = first.nextWaiter;
    // 设置first结点的nextWaiter域为空
    first.nextWaiter = null;
    // 将first结点从condition队列转移到sync队列
    transferForSignal(first);
    // 重新设置first
    first = next;
    } while (first != null);
    }

    final boolean transferForSignal(Node node) {
    /*
    * If cannot change waitStatus, the node has been cancelled.
    */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;

    /*
    * Splice onto queue and try to set waitStatus of predecessor to
    * indicate that thread is (probably) waiting. If cancelled or
    * attempt to set waitStatus fails, wake up to resync (in which
    * case the waitStatus can be transiently and harmlessly wrong).
    */
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
    return true;
    }

    private Node enq(final Node node) {
    for (;;) { // 无限循环,确保结点能够成功入队列
    // 保存尾结点
    Node t = tail;
    if (t == null) { // 尾结点为空,即还没被初始化
    if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点
    tail = head; // 头结点与尾结点都指向同一个新生结点
    } else { // 尾结点不为空,即已经被初始化过
    // 将node结点的prev域连接到尾结点
    node.prev = t;
    if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
    // 设置尾结点的next域为node
    t.next = node;
    return t; // 返回尾结点
    }
    }
    }
    }
  3. reset分析
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    /**
    * reset 方法用于强制重置屏障,使屏障进入新一轮的运行过程中
    */
    public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    // 破坏屏障
    breakBarrier(); // break the current generation
    // 开启新一轮的运行过程
    nextGeneration(); // start a new generation
    } finally {
    lock.unlock();
    }
    }

与CountDownLatch区别

差异点 CountDownLatch CyclicBarrier
是否可循环使用
是否可设置回调

总结

CyclicBarrier底层是基于ReentrantLock和AbstractQueuedSynchronizer来实现的.