Java线程同步组件CyclicBarrier分析
简介
与 CountDownLatch 的实现方式不同,CyclicBarrier 并没有直接通过 AQS 实现同步功能,而是在重入锁 ReentrantLock 的基础上实现的。在 CyclicBarrier 中,线程访问 await 方法需先获取锁才能访问。在最后一个线程访问 await 方法前,其他线程进入 await 方法中后,会调用 Condition 的 await 方法进入等待状态。在最后一个线程进入 CyclicBarrier await 方法后,该线程将会调用 Condition 的 signalAll 方法唤醒所有处于等待状态中的线程。同时,最后一个进入 await 的线程还会重置 CyclicBarrier 的状态,使其可以重复使用。
在创建 CyclicBarrier 对象时,需要转入一个值,用于初始化 CyclicBarrier 的成员变量 parties,该成员变量表示屏障拦截的线程数。当到达屏障的线程数小于 parties 时,这些线程都会被阻塞住。当最后一个线程到达屏障后,此前被阻塞的线程才会被唤醒。
Demo例子
1 | package com.fly.learn.reentrantlock; |
1 | 2018-11-14 23:14:23.641 [main] INFO c.f.l.r.CyclicBarrierTest - main going to await |
源码分析
- 类图
CyclicBarrier 是基于重入锁 ReentrantLock 实现相关逻辑的。所以要弄懂 CyclicBarrier 的源码,仅需有 ReentrantLock 相关的背景知识即可。关于重入锁 ReentrantLock 方面的知识,有兴趣的朋友可以参考我之前写的文章 Java 重入锁 ReentrantLock原理分析。下面看一下 CyclicBarrier 的代码结构吧,如下:
- 构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
private static class Generation {
// 用于记录屏障有没有被破坏
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;//线程数,即当 parties 个线程到达屏障后,屏障才会放行
/* The command to run when tripped */
private final Runnable barrierCommand;//回调对象,如果不为 null,会在第 parties 个线程到达屏障后被执行
/** The current generation */
/**
* CyclicBarrier 是可循环使用的屏障,这里使用 Generation 记录当前轮次 CyclicBarrier
* 的运行状态。当所有线程到达屏障后,generation 将会被更新,表示 CyclicBarrier 进入新一
* 轮的运行轮次中。
*/
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;//计数器,当 count > 0 时,到达屏障的线程会进入等待状态。当最后一个线程到达屏障后,count 自减至0。最后一个到达的线程会执行回调方法,并唤醒其他处于等待状态中的线程。
/**
* 创建一个允许 parties 个线程通行的屏障
* @param parties
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
* 创建一个允许 parties 个线程通行的屏障,若 barrierAction 回调对象不为 null,
* 则在最后一个线程到达屏障后,执行相应的回调逻辑
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
} - await分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private final ReentrantLock lock = new ReentrantLock();
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
final Generation g = generation;
// 如果 g.broken = true,表明屏障被破坏了,这里直接抛出异常
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
// 如果线程中断,则调用 breakBarrier 破坏屏障
breakBarrier();
throw new InterruptedException();
}
/*
* index 表示线程到达屏障的顺序,index = parties - 1 表明当前线程是第一个
* 到达屏障的。index = 0,表明当前线程是最有一个到达屏障的。
*/
int index = --count;
if (index == 0) { // tripped
// 当 index = 0 时,唤醒所有处于等待状态的线程
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 如果回调对象不为 null,则执行回调
if (command != null)
command.run();
ranAction = true;
// 重置屏障状态,使其进入新一轮的运行过程中
nextGeneration();
return 0;
} finally {
// 若执行回调的过程中发生异常,此时调用 breakBarrier 破坏屏障
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
//线程运行到此处的线程都会被屏障挡住,并进入等待状态。
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
/*
* 若下面的条件成立,则表明本轮运行还未结束。此时调用 breakBarrier
* 破坏屏障,唤醒其他线程,并抛出异常
*/
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
/*
* 若上面的条件不成立,则有两种可能:
* 1. g != generation
* 此种情况下,表明循环屏障的第 g 轮次的运行已经结束,屏障已经
* 进入了新的一轮运行轮次中。当前线程在稍后返回 到达屏障 的顺序即可
*
* 2. g = generation 但 g.broken = true
* 此种情况下,表明已经有线程执行过 breakBarrier 方法了,当前
* 线程则会在稍后抛出 BrokenBarrierException
*/
Thread.currentThread().interrupt();
}
}
// 屏障被破坏,则抛出 BrokenBarrierException 异常
if (g.broken)
throw new BrokenBarrierException();
// 屏障进入新的运行轮次,此时返回线程在上一轮次到达屏障的顺序
if (g != generation)
return index;
// 超时判断
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
/**
* 开启新的一轮运行过程
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();// 唤醒所有处于等待状态中的线程
// set up next generation
count = parties;// 重置 count
generation = new Generation();// 重新创建 Generation,表明进入循环屏障进入新的一轮运行轮次中
}
/**
* 破坏屏障
*/
private void breakBarrier() {
generation.broken = true;// 设置屏障是否被破坏标志
count = parties;// 重置 count
trip.signalAll();// 唤醒所有处于等待状态中的线程
}
//------------------------AQS----------------------//
public final void signalAll() {
if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
throw new IllegalMonitorStateException();
// 保存condition队列头结点
Node first = firstWaiter;
if (first != null) // 头结点不为空
// 唤醒所有等待线程
doSignalAll(first);
}
private void doSignalAll(Node first) {
// condition队列的头结点尾结点都设置为空
lastWaiter = firstWaiter = null;
// 循环
do {
// 获取first结点的nextWaiter域结点
Node next = first.nextWaiter;
// 设置first结点的nextWaiter域为空
first.nextWaiter = null;
// 将first结点从condition队列转移到sync队列
transferForSignal(first);
// 重新设置first
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
private Node enq(final Node node) {
for (;;) { // 无限循环,确保结点能够成功入队列
// 保存尾结点
Node t = tail;
if (t == null) { // 尾结点为空,即还没被初始化
if (compareAndSetHead(new Node())) // 头结点为空,并设置头结点为新生成的结点
tail = head; // 头结点与尾结点都指向同一个新生结点
} else { // 尾结点不为空,即已经被初始化过
// 将node结点的prev域连接到尾结点
node.prev = t;
if (compareAndSetTail(t, node)) { // 比较结点t是否为尾结点,若是则将尾结点设置为node
// 设置尾结点的next域为node
t.next = node;
return t; // 返回尾结点
}
}
}
} - reset分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/**
* reset 方法用于强制重置屏障,使屏障进入新一轮的运行过程中
*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 破坏屏障
breakBarrier(); // break the current generation
// 开启新一轮的运行过程
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
与CountDownLatch区别
差异点 | CountDownLatch | CyclicBarrier |
---|---|---|
是否可循环使用 | 否 | 是 |
是否可设置回调 | 否 | 是 |
总结
CyclicBarrier底层是基于ReentrantLock和AbstractQueuedSynchronizer来实现的.