AbstractQueuedSynchronizer源码分析

AbstractQueuedSynchronizer介绍

AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

AQS原理图

AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。

1
private volatile int state;//共享变量,使用volatile修饰保证线程可见性

状态信息通过procted类型的getState,setState,compareAndSetState进行操作

1
2
3
4
5
6
7
8
9
10
11
12
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

AQS 对资源的共享方式

AQS定义两种资源共享方式

  1. Exclusive(独占):只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁:
    1.1 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
    1.2 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
  2. Share(共享):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock 我们都会在后面讲到。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源 state 的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在上层已经帮我们实现好了。

AQS底层使用了模板方法模式

同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

  1. 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)
  2. 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:

1
2
3
4
5
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS(Compare and Swap)减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

自定义实现独占锁Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.fly.learn.aqs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
* 独占锁自定义实现
* @author: peijiepang
* @date 2018/11/10
* @Description:
*/
public class ExclusiveLock {

private final static Logger LOGGER = LoggerFactory.getLogger(ExclusiveLock.class);

/**
* 独占锁实现
*/
private final class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
//当状态为0的时候获取锁,CAS操作成功,则state状态为1,
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
//释放锁,将同步状态置为0
if (getState() == 0) throw new IllegalMonitorStateException();

setExclusiveOwnerThread(null);
setState(0);
return true;
}

@Override
protected boolean isHeldExclusively() {
// 1表示处于独占状态
return getState() == 1;
}
}

/**
* 独占锁
*/
private final Sync sync = new Sync();

/**
* 加锁-阻塞方式
*/
public void lock(){
sync.acquire(1);
}

/**
* 尝试加锁,不一定成功
* @return
*/
public boolean tryLock(){
return sync.tryAcquire(1);
}

/**
* 解锁
*/
public void unLock(){
sync.release(1);
}

}

源码分析

我们先来简单描述下AQS的基本实现,前面我们提到过,AQS维护一个共享资源state,通过内置的FIFO来完成获取资源线程的排队工作。(这个内置的同步队列称为”CLH”队列)。该队列由一个一个的Node结点组成,每个Node结点维护一个prev引用和next引用,分别指向自己的前驱和后继结点。AQS维护两个指针,分别指向队列头部head和尾部tail。

当线程获取资源失败(比如tryAcquire时试图设置state状态失败),会被构造成一个结点加入CLH队列中,同时当前线程会被阻塞在队列中(通过LockSupport.park实现,其实是等待态)。当持有同步状态的线程释放同步状态时,会唤醒后继结点,然后此结点线程继续加入到对同步状态的争夺中。

  1. Node结点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    static final class Node {
    /** waitStatus值,表示线程已被取消(等待超时或者被中断)*/
    static final int CANCELLED = 1;
    /** waitStatus值,表示后继线程需要被唤醒(unpaking)*/
    static final int SIGNAL = -1;
    /**waitStatus值,表示结点线程等待在condition上,当被signal后,会从等待队列转移到同步到队列中 */
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /** waitStatus值,表示下一次共享式同步状态会被无条件地传播下去
    static final int PROPAGATE = -3;
    /** 等待状态,初始为0 */
    volatile int waitStatus;
    /**当前结点的前驱结点 */
    volatile Node prev;
    /** 当前结点的后继结点 */
    volatile Node next;
    /** 与当前结点关联的排队中的线程 */
    volatile Thread thread;
    /** ...... */
    }
  2. 独占式-acquire–获取同步状态

    1
    2
    3
    4
    5
    public final void acquire(int arg) {
    if (!tryAcquire(arg) && //调用使用者重写的tryAcquire方法,若返回true,意味着获取同步状态成功,后面的逻辑不再执行;若返回false,也就是获取同步状态失败
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //获取同步状态失败,构造独占式同步结点,通过addWatiter将此结点添加到同步队列的尾部(此时可能会有多个线程结点试图加入同步队列尾部,需要以线程安全的方 式添加)
    selfInterrupt(); //该结点以在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断。
    }
  3. 独占式-addWaiter-为获取同步状态失败的线程,构造成一个Node结点,添加到同步队列尾部

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    /**
    * 构造新节点
    * @param mode
    * @return
    */
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail; //指向尾节点
    if (pred != null) {
    node.prev = pred;
    if (compareAndSetTail(pred, node)) { //如果尾结点不为空,CAS快速尝试在尾部添加,若CAS设置成功,返回
    pred.next = node;
    return node;
    }
    }
    enq(node);
    return node;
    }

    /**
    * 如果没有节点则自旋转设置节点
    * @param node
    * @return
    */
    private Node enq(final Node node) {
    for (;;) {
    Node t = tail;
    if (t == null) { // Must initialize
    if (compareAndSetHead(new Node())) //如果队列为空,创建结点,同时被head和tail引用
    tail = head;
    } else {
    node.prev = t;
    if (compareAndSetTail(t, node)) { //cas设置尾结点,不成功就一直重试
    t.next = node;
    return t;
    }
    }
    }
    }
  4. 独占式-acquireQueued–在等待队列中排队拿号
    acquireQueued内部也是一个死循环,只有前驱结点是头结点的结点,也就是老二结点,才有机会去tryAcquire;若tryAcquire成功,表示获取同步状态成功,将此结点设置为头结点;若是非老二结点,或者tryAcquire失败,则进入shouldParkAfterFailedAcquire去判断判断当前线程是否应该阻塞,若可以,调用parkAndCheckInterrupt阻塞当前线程,直到被中断或者被前驱结点唤醒。若还不能休息,继续循环。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    /**
    * 在等待队列中排队拿号
    * @param node
    * @param arg
    * @return
    */
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    final Node p = node.predecessor(); //找到当前结点的前驱结点
    if (p == head && tryAcquire(arg)) { //如果前驱结点是头结点,才tryAcquire,其他结点是没有机会tryAcquire的。
    setHead(node); //获取同步状态成功,将当前结点设置为头结点。
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    if (shouldParkAfterFailedAcquire(p, node) && // 如果没有获取到同步状态,通过shouldParkAfterFailedAcquire判断是否应该阻塞,parkAndCheckInterrupt用来阻塞线程
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

    /**
    * 主要用于检查状态是否阻塞
    * @param pred
    * @param node
    * @return
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //拿到前驱的状态
    if (ws == Node.SIGNAL) //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
    /*
    * This node has already set status asking a release
    * to signal it, so it can safely park.
    */
    /*
    * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
    * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
    */
    return true;
    if (ws > 0) {
    /*
    * Predecessor was cancelled. Skip over predecessors and
    * indicate retry.
    */
    do {
    node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    } else {
    /*
    * waitStatus must be 0 or PROPAGATE. Indicate that we
    * need a signal, but don't park yet. Caller will need to
    * retry to make sure it cannot acquire before parking.
    */
    //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
    }

    private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); //调用park()使线程进入waiting状态
    return Thread.interrupted(); //如果被唤醒,查看自己是不是被中断的。
    }

    tryAcquire流程

    1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
    2. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
    3. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
      如下图:
  1. 独占式-release–释放同步状态

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public final boolean release(int arg) {
    if (tryRelease(arg)) { //调用使用者重写的tryRelease方法,若成功,唤醒其后继结点,失败则返回false
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h); //唤醒后继结点
    return true;
    }
    return false;
    }

    /**
    * 唤醒后继结点 
    * @param node
    */
    private void unparkSuccessor(Node node) {
    /*
    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling. It is OK if this
    * fails or if status is changed by waiting thread.
    */
    int ws = node.waitStatus; //获取wait状态
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0); // 将等待状态waitStatus设置为初始值0

    /*
    * Thread to unpark is held in successor, which is normally
    * just the next node. But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) { //若后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到一个处于正常阻塞状态的结点进行唤醒
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    if (s != null)
    LockSupport.unpark(s.thread); //使用LockSupprot唤醒结点对应的线程
    }
  2. 共享式-acquireShared–获取同步状态
    共享式:共享式地获取同步状态。对于独占式同步组件来讲,同一时刻只有一个线程能获取到同步状态,其他线程都得去排队等待,其待重写的尝试获取同步状态的方法tryAcquire返回值为boolean,这很容易理解;对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态,这也是“共享”的意义所在。其待重写的尝试获取同步状态的方法tryAcquireShared返回值为int。

    • 当返回值大于0时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取;
    • 当返回值等于0时,表示获取同步状态成功,但没有可用同步状态;
    • 当返回值小于0时,表示获取同步状态失败。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0) ///返回值小于0,获取同步状态失败,排队去;获取同步状态成功,直接返回去干自己的事儿。
doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//构造一个共享结点,添加到同步队列尾部。若队列初始为空,先添加一个无意义的傀儡结点,再将新节点添加到队列尾部。
boolean failed = true;//是否获取成功
try {
boolean interrupted = false;//线程parking过程中是否被中断过
for (;;) {//死循环
final Node p = node.predecessor();//找到前驱结点
if (p == head) {//头结点持有同步状态,只有前驱是头结点,才有机会尝试获取同步状态
int r = tryAcquireShared(arg);//尝试获取同步装填
if (r >= 0) {//r>=0,获取成功
setHeadAndPropagate(node, r);//获取成功就将当前结点设置为头结点,若还有可用资源,传播下去,也就是继续唤醒后继结点
p.next = null; // 方便GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&//是否能安心进入parking状态
parkAndCheckInterrupt())//阻塞线程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
  1. 共享式-releaseShared–释放同步状态
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();//释放同步状态
    return true;
    }
    return false;
    }

    private void doReleaseShared() {
    for (;;) {//死循环,共享模式,持有同步状态的线程可能有多个,采用循环CAS保证线程安全
    Node h = head;
    if (h != null && h != tail) {
    int ws = h.waitStatus;
    if (ws == Node.SIGNAL) {
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue;
    unparkSuccessor(h);//唤醒后继结点
    }
    else if (ws == 0 &&
    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue;
    }
    if (h == head)
    break;
    }
    }

    总结

    AQS是JUC中很多同步组件的构建基础,简单来讲,它内部实现主要是状态变量state和一个FIFO队列来完成,同步队列的头结点是当前获取到同步状态的结点,获取同步状态state失败的线程,会被构造成一个结点(或共享式或独占式)加入到同步队列尾部(采用自旋CAS来保证此操作的线程安全),随后线程会阻塞;释放时唤醒头结点的后继结点,使其加入对同步状态的争夺中。
    AQS为我们定义好了顶层的处理实现逻辑,我们在使用AQS构建符合我们需求的同步组件时,只需重写tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由AQS为我们完成了,这也是非常典型的模板方法的应用。AQS定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现。