Java线程同步组件Semaphore分析
定义
Semaphore 主要用于限量控制并发执行代码的工具类, 其内部通过 一个 permit 来进行定义并发执行的数量,本质就是aqs的共享锁.
类图
Demo
1 | package com.fly.learn.reentrantlock; |
执行结果如下
1 | 2018-11-17 21:05:46.644 [pool-2-thread-1] INFO c.f.l.reentrantlock.SemaphoreTest - pool-2-thread-1...... |
从上面的执行结果来看,每1s都会获取到2个令牌,符合预期Semaphore配置为2的结论。
特点
1.Semaphore方法的实现通过 Sync(AQS的继承类)代理来实现
2.支持公平与非公平模式, 都是在AQS的子类里面进行, 主要区分在 tryAcquire
源码解读
- 构造函数
Semaphore 的功能均由内部类 NonfairSync, FairSync 代理来实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28/**
* 默认使用非公平模式
* Creates a {@code Semaphore} with the given number of
* permits and nonfair fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* 需要指定公平或者非公平模式
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
} - Semaphore内部类Sync
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84/**
* AQS 的子类主要定义获取释放 lock
* Synchronization implementation for semaphore. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
/**
* 指定许可 初始化 Semaphore
* 许可对应aqs中的state
* @param permits
*/
Sync(int permits) {
setState(permits);
}
/**
* 获取许可
* @return
*/
final int getPermits() {
return getState();
}
/**
* 非公平方式获取多个许可
* @param acquires
* @return
*/
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;// 判断获取 acquires 的剩余 permit 数目
if (remaining < 0 ||
compareAndSetState(available, remaining))// cas改变 state
return remaining;
}
}
/**
* 释放许可
* @param releases
* @return
*/
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
/**
* 减少许可
* @param reductions
*/
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
/**
* 将许可设置为0
* @return
*/
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
} - Semaphore 内部类 FairSync, NonfairSync
这两个类均继承 Sync, 两者的区别主要在于在获取时判断是否有线程在 AQS 的 Sync Queue 里面进行等待获取1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44/**
* 非公平模式
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);//直接调用父类的方法
}
}
/**
* 公平模式
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
/**
* 公平版本获取 permit 主要看是否由前继节点
* @param acquires
* @return
*/
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())// 1. 判断是否Sync Queue 里面是否有前继节点
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))// 2. cas 改变state
return remaining;
}
}
} - Semaphore permit获取方式
这边值贴释放1个许可的源码,释放多个许可源码无非只是增加前置校验而已1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165/**
* 调用 acquireSharedInterruptibly 响应中断的方式获取 permit
* Acquires a permit from this semaphore, blocking until one is
* available, or the thread is {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* one of two things happens:
* <ul>
* <li>Some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* for a permit,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* @throws InterruptedException if the current thread is interrupted
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//aqs代码--调用到aqs中的方法,然后在调用到子类实现的tryAcquireShared
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* 调用 acquireUninterruptibly 非响应中断的方式获取 permit
* Acquires a permit from this semaphore, blocking until one is
* available.
*
* <p>Acquires a permit, if one is available and returns immediately,
* reducing the number of available permits by one.
*
* <p>If no permit is available then the current thread becomes
* disabled for thread scheduling purposes and lies dormant until
* some other thread invokes the {@link #release} method for this
* semaphore and the current thread is next to be assigned a permit.
*
* <p>If the current thread is {@linkplain Thread#interrupt interrupted}
* while waiting for a permit then it will continue to wait, but the
* time at which the thread is assigned a permit may change compared to
* the time it would have received the permit had no interruption
* occurred. When the thread does return from this method its interrupt
* status will be set.
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
//aqs源码
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* 尝试获取 permit,其实就是调用非公平锁的获取许可方法
* Acquires a permit from this semaphore, only if one is available at the
* time of invocation.
*
* <p>Acquires a permit, if one is available and returns immediately,
* with the value {@code true},
* reducing the number of available permits by one.
*
* <p>If no permit is available then this method will return
* immediately with the value {@code false}.
*
* <p>Even when this semaphore has been set to use a
* fair ordering policy, a call to {@code tryAcquire()} <em>will</em>
* immediately acquire a permit if one is available, whether or not
* other threads are currently waiting.
* This "barging" behavior can be useful in certain
* circumstances, even though it breaks fairness. If you want to honor
* the fairness setting, then use
* {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }
* which is almost equivalent (it also detects interruption).
*
* @return {@code true} if a permit was acquired and {@code false}
* otherwise
*/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
/**
* 尝试的获取 permit, 支持超时与中断
* Acquires the given number of permits from this semaphore, if all
* become available within the given waiting time and the current
* thread has not been {@linkplain Thread#interrupt interrupted}.
*
* <p>Acquires the given number of permits, if they are available and
* returns immediately, with the value {@code true},
* reducing the number of available permits by the given amount.
*
* <p>If insufficient permits are available then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until one of three things happens:
* <ul>
* <li>Some other thread invokes one of the {@link #release() release}
* methods for this semaphore, the current thread is next to be assigned
* permits and the number of available permits satisfies this request; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>The specified waiting time elapses.
* </ul>
*
* <p>If the permits are acquired then the value {@code true} is returned.
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* to acquire the permits,
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
* Any permits that were to be assigned to this thread, are instead
* assigned to other threads trying to acquire permits, as if
* the permits had been made available by a call to {@link #release()}.
*
* <p>If the specified waiting time elapses then the value {@code false}
* is returned. If the time is less than or equal to zero, the method
* will not wait at all. Any permits that were to be assigned to this
* thread, are instead assigned to other threads trying to acquire
* permits, as if the permits had been made available by a call to
* {@link #release()}.
*
* @param permits the number of permits to acquire
* @param timeout the maximum time to wait for the permits
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if all permits were acquired and {@code false}
* if the waiting time elapsed before all permits were acquired
* @throws InterruptedException if the current thread is interrupted
* @throws IllegalArgumentException if {@code permits} is negative
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
//aqs源码
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
} - Semaphore permit 释放方法
这边值贴释放1个许可的源码,释放多个许可源码无非只是增加前置校验而已1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36/**
* 释放 permit
* Releases the given number of permits, returning them to the semaphore.
*
* <p>Releases the given number of permits, increasing the number of
* available permits by that amount.
* If any threads are trying to acquire permits, then one
* is selected and given the permits that were just released.
* If the number of available permits satisfies that thread's request
* then that thread is (re)enabled for thread scheduling purposes;
* otherwise the thread will wait until sufficient permits are available.
* If there are still permits available
* after this thread's request has been satisfied, then those permits
* are assigned in turn to other threads trying to acquire permits.
*
* <p>There is no requirement that a thread that releases a permit must
* have acquired that permit by calling {@link Semaphore#acquire acquire}.
* Correct usage of a semaphore is established by programming convention
* in the application.
*
* @param permits the number of permits to release
* @throws IllegalArgumentException if {@code permits} is negative
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
//aqs源码
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
总结
Semaphore 通过 AQS中的 state 来进行控制 permit 的获取控制, 其实它就是一个限制数量的 ReadLock; 但要真正理解 Semaphore, 还需要看 AbstractQueuedSynchronizer源码分析。