logo头像
Snippet 博客主题

【Java并发编程实战】-解读J.U.C并发包之AQS

1. AQS介绍

1.1 简要说明

所谓的AQS,其实指的是Java并发包(java.util.concurrent)中的抽象类AbstractQueuedSynchronizer。

AQS的核心是一个线程等待队列,采用的是一个先进先出FIFO队列。它可以用于构建锁或者其他相关同步的基础框架。

Java并发包下的ThreadPoolExecutor.Worker、Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock都是基于AQS来实现的。因此AQS是学习并发包重要武器。

我们打开源码,通读一下文档注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 提供一个基于先进先出(FIFO)等待队列实现的阻塞锁和相关同步器(semaphores,events等)的框架。
* 该类的设计为大多数类型的同步器提供了有用的基础,这些同步器依赖于单个原子int值来表示状态(state)。
* 子类必须定义更改此状态的protected方法,以及根据该对象被获取或释放来定义该状态的含义。
* 鉴于这些,这个类中的其他方法执行所有排队和阻塞机制。子类可以保持其他状态字段,但只能以原子方式更新。
* 使用getState()、setState(int)和compareAndSetState(int, int)来保持同步化。
*
* 子类应定义为非公共内部帮助类,用于实现其外部类的同步属性。
* AbstractQueuedSynchronizer类不实现任何同步接口。
* 相反,它定义了一些方法,如acquireInterruptibly(int),
* 可以通过具体的锁和相关同步器来调用适当执行其public方法。
*
* 此类默认支持互斥模式(exclusive mode-互斥锁)和共享模式(shared mode-共享锁)。
* 当获以互斥模式获取时,其他线程尝试获取是无法成功成功的。
* 由多个线程获取的共享模式可能(但不一定)成功。
* 除了在机制意义上,这个类不理解这些差异,当共享模式获取成功时,
* 下一个等待线程(如果存在)也必须确定它是否也可以获取。
* 在不同模式下等待的线程共享相同的FIFO队列。通常,实现子类只支持这些模式之一,
* 但是两者都可以在ReadWriteLock中发挥作用。
* 仅支持互斥或仅共享模式的子类不需要定义支持未使用模式的方法。
*
* 该类为内部队列提供检查,检测和监控方法,以及条件对象的类似方法。
* 这些可以根据需要导出到类中,使用AbstractQueuedSynchronizer进行同步机制。
*
* 此类的序列化仅存储底层的原子整数维护状态,因此反序列化的对象具有空的线程队列。
* 需要序列化性的典型子类将定义一个readObject方法,该方法可在反序列化时将其恢复为已知的初始状态。
*
* 用法:
* 使用这个类用作同步的基础上,重新定义以下方法,如适用,通过检查和或修改使用所述同步状态getState()、setState(int)、compareAndSetState(int, int) :
* tryAcquire(int)
* tryRelease(int)
* tryAcquireShared(int)
* tryReleaseShared(int)
* isHeldExclusively()
* 每个这些方法默认抛出UnsupportedOperationException。
* 这些方法的实现必须是线程安全的,通常应该是短暂的而不是阻塞的。
* 定义这些方法是唯一支持使用此类的方法。
* 所有其他方法都被声明为final ,因为它们不能独立变化。
*
* 即使这个类基于内部FIFO队列,它也不会自动执行FIFO采集策略。
* 排他同步的核心形式如下:
*
* Acquire:
* while (!tryAcquire(arg)) {
* enqueue thread if it is not already queued;
* possibly block current thread;
* }
*
* Release:
* if (tryRelease(arg))
* unblock the first queued thread;
*
* @since 1.5
* @author Doug Lea
*/

1.2 继承体系

1.2.1 继承体系图

AQS继承体系图

1.2.2 AbstractOwnableSynchronizer顶层抽象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package java.util.concurrent.locks;

/**
* 可能是线程专有的同步器。
* 此类为创建可能涉及所有概念的锁和相关的同步器提供了基础。
* AbstractOwnableSynchronizer类本身不管理或使用此信息。
* 然而,子类和工具可能会使用适当维护的值来帮助控制和监视访问并提供诊断。
* A synchronizer that may be exclusively owned by a thread. This
* class provides a basis for creating locks and related synchronizers
* that may entail a notion of ownership. The
* {@code AbstractOwnableSynchronizer} class itself does not manage or
* use this information. However, subclasses and tools may use
* appropriately maintained values to help control and monitor access
* and provide diagnostics.
*
* @since 1.6
* @author Doug Lea
*/
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {

/** 即使所有字段都是瞬态的(不序列化),也要使用序列号。*/
/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;

/**
* 供子类使用的空构造函数。
* Empty constructor for use by subclasses.
*/
protected AbstractOwnableSynchronizer() { }

/**
* 互斥模式同步下的当前线程拥有者。
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;

/**
* 设置当前拥有互斥访问的线程。
* null参数表示没有线程拥有访问权限。
* 此方法不以其他方式强加任何同步或volatile字段访问。
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread 拥有者线程
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}

/**
* 返回上一次由setExclusiveOwnerThread或null设置的线程(如果从未设置)。
* 否则,此方法不会强制进行任何同步或volatile字段访问。
* Returns the thread last set by {@code setExclusiveOwnerThread},
* or {@code null} if never set. This method does not otherwise
* impose any synchronization or {@code volatile} field accesses.
* @return the owner thread 返回拥有者线程
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}

2. AQS源码分析

2.1 成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import sun.misc.Unsafe;

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

/** 序列化UID */
private static final long serialVersionUID = 7373984972572414691L;


/**
* 等待队列的头,延迟初始化。除初始化外,只能通过setHead方法进行修改。
* 注意: 如果head存在,则保证其waitStatus不为CANCELLED。
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;

/**
* 等待队列的尾部,延迟初始化。
* 仅通过方法enq进行修改以添加新的等待节点。
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;

/**
* 控制加锁解锁的状态变量,用于同步状态。
* The synchronization state.
*/
private volatile int state;

/**
* 旋转超时阈值
* 旋转比使用定时停驻更快的纳秒数。粗略估计足以在非常短的超时时间内提高响应能力。
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;



/**
* 设置为支持compareAndSet。我们需要在这里本地实现:
* 为了允许将来的增强,我们不能显式地继承AtomicInteger的子类,否则它将是高效且有用的。
* 因此,作为罪恶之源,我们使用热点内在API本机实现。
* 当我们这样做时,我们会对其他可CAS字段执行相同的操作(否则可以通过原子字段更新程序来完成)。
* unsafe: Unsafe类主要提供了像C语言的指针一样操作内存空间的能力,可以直接操作内存,不受JVM和GC管理。
* stateOffset: 状态变量state的内存地址偏移量
* headOffset: 头节点的内存地址偏移量
* tailOffset: 尾节点的内存地址偏移量
* waitStatusOffset: 等待状态的内存地址偏移量(Node的属性)
* nextOffset: 下一个节点的内存地址偏移量(Node的属性)
* Setup to support compareAndSet. We need to natively implement
* this here: For the sake of permitting future enhancements, we
* cannot explicitly subclass AtomicInteger, which would be
* efficient and useful otherwise. So, as the lesser of evils, we
* natively implement using hotspot intrinsics API. And while we
* are at it, we do the same for other CASable fields (which could
* otherwise be done with atomic field updaters).
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

}

2.2 静态初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* 静态初始化5个偏移量。
* 通过反射getDeclaredField获取本类所有的字段,包括private的,但是不能获取继承来的字段。
* 利用unsafe.objectFieldOffset获取当前字段相对Java对象的“起始地址”的偏移量然后并赋值。
* 如果发生异常,直接抛出Error。
*/
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));

} catch (Exception ex) { throw new Error(ex); }
}

2.3 构造器

1
2
3
4
5
6
/**
* 创建一个新的AbstractQueuedSynchronizer}实例,初始同步状态(state)为零。
* Creates a new {@code AbstractQueuedSynchronizer} instance
* with initial synchronization state of zero.
*/
protected AbstractQueuedSynchronizer() { }

2.4 重要方法

2.4.1 state的getter/setter方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 返回同步状态的当前值。此操作具有volatile读取的内存语义。
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value 返回当前状态值
*/
protected final int getState() {
return state;
}

/**
* 设置同步状态的值。此操作具有volatile写操作的内存语义。
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value 新的状态值
*/
protected final void setState(int newState) {
state = newState;
}

2.4.2 compareAndSetState方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* CAS(CompareAndSwap)原子操作:比较与设置state值
* 如果当前状态值等于期望值,则以原子方式将同步状态的值设置为给定的新值。
* 此操作具有volatile读取和写入的内存语义。
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value 期望值
* @param update the new value 新值
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value. 若设置成功返回true,反之返回false
*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

2.4.3 tryAcquire方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 尝试以独占/互斥/排他模式进行获取锁。
* 该方法应查询对象的状态是否允许以独占模式获取对象,如果是,则查询该对象。
* 此方法总是由执行获取的线程调用。如果此方法报告失败,则获取方法可以将线程排队(如果尚未排队),
* 直到被其他线程释放发出信号。可以使用实现方法Lock#tryLock()
*
* Attempts to acquire in exclusive mode. This method should query
* if the state of the object permits it to be acquired in the
* exclusive mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread. This can be used
* to implement method {@link Lock#tryLock()}.
*
* 默认的实现抛出UnsupportedOperationException,交给子类去实现
* <p>The default
* implementation throws {@link UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like. 获取锁的参数
* @return {@code true} if successful. Upon success, this object has
* been acquired. 若成功获取锁,返回true,反之返回false
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

2.4.4 tryRelease方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 在互斥模式下尝试设置state来表示释放锁
* Attempts to set the state to reflect a release in exclusive
* mode.
*
* 这个方法总是由正在执行的线程释放锁
* <p>This method is always invoked by the thread performing release.
*
* 默认的实现抛出UnsupportedOperationException,交给子类去实现
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like. 释放参数
* @return {@code true} if this object is now in a fully released
* state, so that any waiting threads may attempt to acquire;
* and {@code false} otherwise. 返回true,如果此对象现在处于完全释放状态,则任何等待线程都可以尝试获取;反之返回false
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

2.4.5 tryAcquireShared方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 以共享模式获取锁
* Attempts to acquire in shared mode. This method should query if
* the state of the object permits it to be acquired in the shared
* mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread.
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return a negative value on failure; zero if acquisition in shared
* mode succeeded but no subsequent shared-mode acquire can
* succeed; and a positive value if acquisition in shared
* mode succeeded and subsequent shared-mode acquires might
* also succeed, in which case a subsequent waiting thread
* must check availability. (Support for three different
* return values enables this method to be used in contexts
* where acquires only sometimes act exclusively.) Upon
* success, this object has been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

2.4.6 tryReleaseShared方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/** 
* 共享模式下的释放锁
* Attempts to set the state to reflect a release in shared mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this release of shared mode may permit a
* waiting acquire (shared or exclusive) to succeed; and
* {@code false} otherwise
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

2.4.7 isHeldExclusively方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 返回是否以独占模式获有锁
* Returns {@code true} if synchronization is held exclusively with
* respect to the current (calling) thread. This method is invoked
* upon each call to a non-waiting {@link ConditionObject} method.
* (Waiting methods instead invoke {@link #release}.)
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}. This method is invoked
* internally only within {@link ConditionObject} methods, so need
* not be defined if conditions are not used.
*
* @return {@code true} if synchronization is held exclusively;
* {@code false} otherwise 如果同步过程中保持互斥模式,返回true,反之返回false
* @throws UnsupportedOperationException if conditions are not supported
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

2.4.8 addWaiter方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 通过当前线程和给定的模式(独占/共享模式)来创建节点并进行节点入队
* Creates and enqueues node for current thread and given mode.
*
* Node.EXCLUSIVE表示独占模式,Node.SHARED表示共享模式
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node 返回新的节点
*/
private Node addWaiter(Node mode) {
// 新建与一个当前线程关联的node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 首先为当前线程以指定模式构建一个Node节点,然后尝试快速入队的方式加入到同步队列中去
// 获取尾结点
Node pred = tail;
// 如果尾结点不为空
if (pred != null) {
// 将尾结点链接到新节点的前置节点
node.prev = pred;
// CAS尝试将新建的node加入到队尾
if (compareAndSetTail(pred, node)) {
// 如果CAS成功,将之前的尾结点的后继节点设置为新节点指针,即新节点e加入到队尾
pred.next = node;
// 返回设置好的节点
return node;
}
}
// 如果没有尾结点,则初始化队列
enq(node);
// 返回当前节点
return node;
}

2.4.9 enq方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 将节点插入队列,必要时进行初始化。
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert 要插入的节点
* @return node's predecessor 返回节点的前置节点
*/
private Node enq(final Node node) {
// 通过一个无条件的循环,知道将构建一个空的head,然后将当前节点加入到空的head的后面,构成一个同步队列
for (;;) {
// 获取尾结点
Node t = tail;
if (t == null) { // Must initialize 若尾结点为null,必须初始化
// CAS尝试设置新节点为head
if (compareAndSetHead(new Node()))
// CAS成功后,头节点就是尾结点
tail = head;
} else {
// 如果尾结点不为null,将node的前置节点设置为t(尾结点)
node.prev = t;
// CAS将node设置为尾结点,设置成功即tail就是node
if (compareAndSetTail(t, node)) {
// 设置t的下一个节点是node节点,即将node节点是尾结点
t.next = node;
// 返回节点的前置节点
return t;
}
}
}
}

2.4.10 setHead方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 将队列头设置为节点,从而出队。
* 仅由获取方法调用。出于GC的考虑,还会清空未使用的字段,并抑制不必要的信号和遍历。
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) {
// 将node设置为头结点
head = node;
// node的线程变量设置为null
node.thread = null;
// node的前置节点设置为null
node.prev = null;
}

2.4.11 acquireQueued方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 以互斥的不间断模式获取已经在队列中的线程。用于条件等待方法以及获取。
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node node节点
* @param arg the acquire argument 获取的参数
* @return {@code true} if interrupted while waiting 若在等待时中断返回true
*/
final boolean acquireQueued(final Node node, int arg) {
// 定义失败的标志变量,默认为true
boolean failed = true;
try {
// 定义中断的标志变量,默认为true
boolean interrupted = false;
// 无条件循环
for (;;) {
// 获取node节点的前置节点
final Node p = node.predecessor();
// //判断前置节点是不是head节点,如果是的话,则会再次尝试去获取锁
if (p == head && tryAcquire(arg)) {
// 将node设置为头节点
setHead(node);
// p的后继节点设置为null
p.next = null; // help GC 方便GC
// 设置失败标志failed为false
failed = false;
// 返回中断标志interrupted为false
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // 如果前一个节点不是head节点,则设置前一个非取消节点的状态是signal,以确保在前一个线程释放锁的时候能唤醒当前线程
parkAndCheckInterrupt()) // 挂起当前线程,并且返回当前线程是否被中断过(会清空中断状态,只有在被唤醒的时候才能从park()方法返回)
interrupted = true; // 设置中断标志为true
}
} finally {
if (failed)
// 如果被中断失败过,则把该节点从同步队列中删除
cancelAcquire(node);
}
}

2.4.12 selfInterrupt方法

1
2
3
4
5
6
7
8
/**
* 一种简便的方式中断当前线程
* Convenience method to interrupt current thread.
*/
static void selfInterrupt() {
// 这个就很简单了,直接调用当前线程的interrupt()
Thread.currentThread().interrupt();
}

2.4.13 acquire方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 以独占模式获取,忽略中断。
* 通过至少调用一次tryAcquire的实现返回成功。
* 否则,线程将排队,有可能反复阻塞和解除阻塞,调用#tryAcquire直到成功。
* 此方法可以用于实现方法Lock#lock()。
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果tryAcquire获取锁失败,中断当前线程,并且创建一个代表当前线程的结点加入到等待队列的尾部
// 将新加入的结点放入队列之后,这个结点有两种状态,要么获取锁,要么就挂起,如果这个结点不是头结点的后继节点,就看看这个结点是否应该挂起,如果应该挂起,就挂起当前结点,是否应该挂起是通过shouldParkAfterFailedAcquire方法来判断的  
selfInterrupt();
}

2.4.14 transferForSignal方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 将条件等待队列节点转移到AQS的FIFO同步队列中。
* 如果成功,则返回true。否则该node在signal之前被取消。
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* 如果CAS失败,无法更改waitStatus,则该节点在signal之前被取消了,返回false。
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* 追加到队列上,然后尝试将前一个节点的waitStatus设置为表示线程正在(可能)正在等待。
* 如果取消设置或尝试设置waitStatus失败,请唤醒以重新同步(在这种情况下,waitStatus可能会短暂无害地出现错误)。
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 进行入队操作
Node p = enq(node);
// 获取入队节点的前继节点的状态
int ws = p.waitStatus;
// 若前继节点为Cancelled节点(状态为1,大于0的都是取消节点),或非取消节点CAS设置为SIGNAL延迟唤醒,通过LockSupport.unpark唤醒线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
// 唤醒成功后返回true
return true;
}

2.4.15 fullyRelease方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 用当前state值调用释放;返回保存状态。
* 取消节点并在失败时引发异常。
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait 此等待的条件节点
* @return previous sync state 先前的同步状态
*/
final int fullyRelease(Node node) {
// 定义failed,作为失败标记
boolean failed = true;
try {
// 获取state状态值
int savedState = getState();
// 根据状态释放
if (release(savedState)) {
// 释放成功,失败变量标记为false
failed = false;
// 返回先前的状态
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
// 如果取消节点被释放失败,则节点重新设置为等待节点
node.waitStatus = Node.CANCELLED;
}
}

2.4.16 isOnSyncQueue方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 判断节点是否在同步队列中。
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
// 判断节点的状态,如果状态是CONDITION,说明节点肯定不在同步队列中,同时哪怕同步队列是刚刚初始化的,也会有一个冗余的头节点存在,所以节点的前驱节点如果为null,那么节点也肯定不在同步队列中,返回fasle
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 节点的后继节点不为null,说明节点肯定在队列中,返回true,这里很重要的一点要明白,prev和next都是针对同步队列的节点
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* 调用findNodeFromTail,具体返回结果看findNodeFromTail()方法返回值,可以把这个方法想象成一个兜底的方法.
* node.prev可以为非null,但尚未进入队列,因为将其放入队列的CAS可能会失败。因此,我们必须从尾部遍历以确保它确实做到了。
* 在此方法的调用中,它将始终处于尾部,并且除非CAS失败(这不太可能),否则它将一直存在,因此我们几乎不会遍历太多。
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}

2.4.17 findNodeFromTail方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 如果节点在同步队列中(从尾向后搜索),则返回true。
* 仅在isOnSyncQueue需要时调用。
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
// 取尾结点
Node t = tail;
for (;;) {
// 如果尾结点就是当前的node,则表示node节点在同步队列中,直接返回true。
if (t == node)
return true;
// 如果尾结点为null,直接返回false
if (t == null)
return false;
// 将指针前移动,指向当前节点t的前一个节点
t = t.prev;
}
}

2.4.18 acquireInterruptibly方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 以独占模式获取锁,如果中断则中止。
* 否则,线程将排队,并可能反复阻塞和解除阻塞,然后调用tryAcquire
* 直到成功或线程被中断为止。该方法可以用于实现Lock#lockInterruptible方法。
*
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted 如果当前线程被中断抛出InterruptedException
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 如果当前线程被中断抛出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取锁
if (!tryAcquire(arg))
// 如果没有获取锁,在排他可中断模式下再次获取锁。
doAcquireInterruptibly(arg);
}

2.4.19 release方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 在独占模式下释放锁。
* 如果tryRelease返回true,则通过解锁一个或多个线程来实现。
* 方法可用于Lock#unlock方法实现
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
// 根据参数释放锁
if (tryRelease(arg)) {

// 取出head节点
Node h = head;
// 如果head节点不为null,并且head的等待状态不是待等待状态
if (h != null && h.waitStatus != 0)
// 如果存在后继者,唤醒节点的后继节点
unparkSuccessor(h);
// 如果释放成功,返回true
return true;
}
// 释放锁失败
return false;
}

2.4.20 unparkSuccessor方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 唤醒节点的后继者(如果存在)。
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* 如果状态为负(即可能需要signal),请尝试以清除预期的信号。如果此失败或通过等待线程更改状态,则可以。
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;

if (ws < 0)
// 如果小于0,即为SIGNAL、CONDITION、PROPAGATE三种之一的状态,CAS设置为等待状态为0(在队列中,等待获取锁)
compareAndSetWaitStatus(node, ws, 0);

/*
* 释放线程将保留在后续线程中,该线程通常仅是下一个节点。但是,如果取消或看似为零,则从尾部向后移动以找到实际的不可取消的后继节点。
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取node的后继节点
Node s = node.next;
// 如果后继节点为null,或者不为null时且后继节点的状态为取消节点时
if (s == null || s.waitStatus > 0) {
// 设置后继节点为null
s = null;
// 从尾部遍历,直到前继节点为null或前继节点是node节点
for (Node t = tail; t != null && t != node; t = t.prev)
// 如果当前节点只要不是取消节点,则将设置s,即node的后继节点为t
if (t.waitStatus <= 0)
s = t;
}
// 如果s不等于null,进行唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}

2.4.21 hasQueuedThreads方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 查询是否有任何线程正在等待获取。请注意因为由于中断和超时引起的取消可能随时发生,所以返回值true不能保证任何其他线程都可以获取。
* Queries whether any threads are waiting to acquire. Note that
* because cancellations due to interrupts and timeouts may occur
* at any time, a {@code true} return does not guarantee that any
* other thread will ever acquire.
*
* <p>In this implementation, this operation returns in
* constant time. 在此实现中,此操作按固定时间返回。
*
* @return {@code true} if there may be other threads waiting to acquire 如果可能还有其他线程在等待获取返回true
*/
public final boolean hasQueuedThreads() {
// 只要头结点不是尾结点,则说明队列中还有其他的线程
return head != tail;
}

2.4.22 hasContended方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 查询是否有任何线程竞争该同步器;也就是说,如果获取方法曾经被阻止。
* Queries whether any threads have ever contended to acquire this
* synchronizer; that is if an acquire method has ever blocked.
*
* <p>In this implementation, this operation returns in
* constant time. 在此实现中,此操作按固定时间返回
*
* @return {@code true} if there has ever been contention
*/
public final boolean hasContended() {
// 只要head不为null,说明有竞争
return head != null;
}

2.4.23 getFirstQueuedThread方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 返回队列中的第一个(等待时间最长)线程,如果当前没有队列,则返回null。
* Returns the first (longest-waiting) thread in the queue, or
* {@code null} if no threads are currently queued.
*
* 在此实现中,此操作通常在固定时间内返回,但是如果其他线程正在同时修改队列,则可能会在争用中迭代。
* <p>In this implementation, this operation normally returns in
* constant time, but may iterate upon contention if other threads are
* concurrently modifying the queue.
*
* @return the first (longest-waiting) thread in the queue, or
* {@code null} if no threads are currently queued
*/
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay 仅处理快速路径,否则中继
// 如果头结点是尾结点说明没有等待线程,直接返回null,否则调用fullGetFirstQueuedThread()获取等待线程
return (head == tail) ? null : fullGetFirstQueuedThread();
}

2.4.24 fullGetFirstQueuedThread方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 快速路径找不到时时调用的getFirstQueuedThread版本
* Version of getFirstQueuedThread called when fastpath fails
*/
private Thread fullGetFirstQueuedThread() {
/*
* 第一个节点通常是head.next。尝试获取其线程字段,确保读取一致:
* 如果thread字段被清空或s.prev不再是head,则一些其他线程在我们的某些读取之间同时执行setHead。
* 在遍历之前,我们尝试两次。
* The first node is normally head.next. Try to get its
* thread field, ensuring consistent reads: If thread
* field is nulled out or s.prev is no longer head, then
* some other thread(s) concurrently performed setHead in
* between some of our reads. We try this twice before
* resorting to traversal.
*/
// 定义两个节点变量,h表示头节点,s表示后继节点
Node h, s;
// 定义一个线程变量
Thread st;
// 条件1:如果头节点不为null,并且头节点的后继节点s不为null,并且后继节点s的前置节点是头节点head(线程不安全,需要再检测),并且head的后继节点线程不为null
// 条件2:如果头节点不为null,并且头节点的后继节点s不为null,并且后继节点s的前置节点是头节点head(线程不安全,需要再检测),并且head的后继节点线程不为null
// 条件1||条件2,若一方为true,则返回true
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
// 直接返回st线程
return st;

/*
* Head的下一个字段可能尚未设置,或者在setHead之后可能未设置。
* 因此,我们必须检查tail是否实际上是第一个节点。如果没有,我们继续安全地从尾节点到头节点找到保证终止。
* Head's next field might not have been set yet, or may have
* been unset after setHead. So we must check to see if tail
* is actually first node. If not, we continue on, safely
* traversing from tail back to head to find first,
* guaranteeing termination.
*/
// 获取尾结点
Node t = tail;
// 定义firstThread变量,用于存在第一个线程
Thread firstThread = null;
// 如果尾结点不为null,并且尾结点不是头节点,向前循环遍历
while (t != null && t != head) {
// 获取当前变量t的线程变量tt
Thread tt = t.thread;
// 如果线程变量不为null,则赋值于firstThread
if (tt != null)
firstThread = tt;
// 指针迁移
t = t.prev;
}
// 返回firstThread,表示第一个等待线程
return firstThread;
}

3.内部类

3.1 等待队列节点类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
/**
* 等待队列节点类。
*
* 等待队列是“CLH”(Craig,Landin和Hagersten,CLH是一种基于链表的高性能、公平的自旋锁.)锁队列的变体。
* CLH锁通常用于自旋锁(spinlocks)。我们将它用于阻塞同步器,仅仅使用相同的基本策略,即在其前置节点中保存有关线程的某些控制信息。
* 每个节点中的“status”字段跟踪线程是否应阻塞。节点的前置节点释放时会发出信号。否则,队列的每个节点都将用作持有单个等待线程的特定通知样式的监视器。
* 虽然状态字段不控制是否授予线程锁等。线程可能会尝试获取它是否在队列中的第一位。但是先行并不能保证成功。它只赋予了竞争的权利。
* 因此,当前释放的竞争线程可能需要重新等待。
*
* 要加入CLH锁,您可以将其自动拼接成新的尾部。要出队,您只需设置head字段。
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
*
*
* 插入CLH队列只需要对“tail”执行一次原子操作,因此存在一个简单的原子分界,即从未排队到排队。
* 同样,使出队仅涉及更新“head”。但是,节点需要花费更多的精力来确定其后继节点,部分是由于超时和中断而可能导致的取消。
*
*
* 主要需要“prev”链接(在原始CLH锁中不使用)来处理取消。
* 如果取消某个节点,则其后继节点(通常)会重新链接到一个未取消的前驱节点。
* 有关自旋锁情况下类似机制的说明,请参见Scott和Scherer的论文,
* 网址为: http://www.cs.rochester.edu/u/scott/synchronization/
*
*
* 我们还使用“next”链接来实现阻止机制。
* 每个节点的线程ID保留在其自己的节点中,因此前驱节点通过遍历下一个链接以确定它是哪个线程,
* 从而通知下一个节点唤醒。确定后继节点必须避免与新排队的节点竞争以设置其前任的“next”字段。
* 如果有必要,可以通过在后继节点看起来为null时从原子更新的“tail”向后检查来解决此问题。
* (或者换句话说,下一个链接是一种优化,因此我们通常不需要向后扫描。)
*
* 取消将一些保守性引入到基本算法中。
* 由于我们必须轮询其他个节点的取消,
* 因此我们可能会遗漏一个被取消的节点位于我们前面还是后面。
* 要解决此问题,必须始终取消后继者,让他们稳定一个新的前驱节点,除非我们能确定一个未取消的前驱节点将承担这一责任。
*
* CLH队列需要一个虚拟标头节点才能开始。
* 但是我们不会在构建过程中创建它们,因为如果用不竞争,那将会浪费功夫。
* 取而代之的是,构造节点,并在第一次竞争时设置头和尾指针。
*
* 等待条件的线程使用相同的节点,但是使用附加链接。条件仅需要在简单(非并行)链接队列中链接节点,因为仅当它们被互斥时才可以访问它们。
* 等待时,将节点插入条件队列。收到信号后,将节点转移到主队列。 status字段的特殊值用于标记节点所在的队列。
*
* 感谢Dave Dice、Mark Moir、Victor Luchangco、Bill、Scherer和Michael Scott,
* 以及JSR-166专家组的成员,对本课程的设计提供了有益的想法、讨论和批评。
*
*/
static final class Node {

/** 指示节点正在共享模式下等待的标记 */
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** 指示节点正在以互斥模式等待的标记 */
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus值为1,表示线程已取消 */
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus值为-1,表示后继节点持有的线程需要释放 */
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus值为-2,表示线程正在条件等待 */
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus值为-3,表示下一个acquireShared(共享模式获取锁)应该无条件传播
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

/**
* 状态字段,仅采用以下值:
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* 该节点的后继节点被(或将很快)被阻止(通过停放),因此当前节点在释放或取消时必须取消停放其后继节点。
* 为了避免种族冲突,acquire方法必须首先表明它们需要一个信号,然后重试原子获取,然后失败时阻塞。
*
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* 由于超时或中断,该节点被取消。节点永远不会离开此状态。特别是具有取消节点的线程永远不会再次阻塞。
*
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* 该节点当前在条件队列中。在传输之前,它不会用作同步队列节点,此时状态将设置为0。(此处使用此值与字段的其他用途无关,但简化了机制)。
*
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* releaseShared应该传播到其他节点。在doReleaseShared中设置(此设置仅适用于头节点),以确保传播继续进行,即使由于干预而进行了其他操作。
*
* 0: None of the above 以上都不是
*
* values按数字排列以简化使用。非负值表示节点不需要发送信号。
* 因此,大多数代码不需要检查特定的值,仅需检查符号即可。
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* 对于普通同步节点,该字段初始化为0;
* 对于条件节点,该字段初始化为CONDITION。使用CAS(或在可能时使用无条件的易失性写操作)对其进行修改。
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;

/**
* 链接到当前节点/线程所依赖的前驱节点以检查waitStatus。
* 在入队期间分配,并且仅在出队时将清空(出于GC的考虑)。
* 同样,在取消前驱节点后,我们在短路的同时找到了一个永远不会存在的不可取消的前驱节点,
* 因为头节点从未被取消:一个节点仅由于成功获取而成为头。
* 被取消的线程永远不会成功获取,并且只有一个线程会取消自身,而不会取消任何其他节点。
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;

/**
* 链接到当前节点/线程在释放时取消驻留的后继节点。
* 在排队期间分配,在绕过已取消的前任时进行调整,并在出队时清零(出于GC的原因)。
* enq操作直到附加之后才分配前驱节点的下一个字段,因此看到下一个空字段并不一定意味着节点在队列末尾。
* 但是,如果下一个字段为null,则我们可以从尾部扫描上一个进行双重检查。
* 被取消节点的下一个字段设置为*指向节点本身而不是null,以使isOnSyncQueue的工作更轻松。
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;

/**
* 使该节点排队的线程。在结构上初始化,使用后消失。
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;

/**
* 链接到等待条件的下一个节点,或者共享特殊值。
* 因为条件队列仅在处于独占模式时才被访问,所以我们只需要一个简单的链接队列即可在节点等待条件时保存节点。
* 然后将它们转移到队列以重新获取。由于条件只能是互斥的,因此我们使用特殊值来表示共享模式来保存字段。
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;

/**
* 如果节点正在共享模式下等待,则返回true。
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* 返回上一个节点,如果为null,则抛出NullPointerException。
* 在前驱节点不能为null时使用。可以忽略空检查,但它可以帮助虚拟机。
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node 该节点的前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

/**
* 空参构造
* 用于建立初始标头或SHARED标记
*/
Node() { // Used to establish initial head or SHARED marker
}

/**
* 多参数构造:支持node设置
* 由addWaiter使用
*/
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

/**
* 多参数构造:支持waitStatus设置
* 由Condition使用
*/
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

3.2 条件对象类

3.2.1 Condition顶层接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Date;

/**
* Condition实现可以提供Object监视器方法的行为和语义(Object中的wait、notify和notifyAll方法),例如有保证的通知顺序,或者在执行通知时不需要锁定。
* Condition本质上绑定一个锁, 要获得特定Condition实例,请使用其newCondition()方法。
* 当等待Condition时,允许发生"虚假唤醒",一般来说,作为对底层平台语义的让步。
*
* @since 1.5
* @author Doug Lea
*/
public interface Condition {

/**
* 使当前线程等待,直到收到信号或线程可以interrupt()中断
*
* @throws InterruptedException if the current thread is interrupted
* (and interruption of thread suspension is supported)
*/
void await() throws InterruptedException;

/**
* 使当前线程等待,直到发出信号。不可中断
*/
void awaitUninterruptibly();

/**
*
* 导致当前线程等待,直到被信号通知或中断,或者经过指定的等待时间(纳秒)。
*
* @param nanosTimeout the maximum time to wait, in nanoseconds
* @return an estimate of the {@code nanosTimeout} value minus
* the time spent waiting upon return from this method.
* A positive value may be used as the argument to a
* subsequent call to this method to finish waiting out
* the desired time. A value less than or equal to zero
* indicates that no time remains.
* @throws InterruptedException if the current thread is interrupted
* (and interruption of thread suspension is supported)
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;

/**
* 使当前线程等待,直到被信号通知或中断,或者经过指定的等待时间。
* 此方法在行为上等效于:awaitNanos(unit.toNanos(time)) > 0。
*
* @param time the maximum time to wait
* @param unit the time unit of the {@code time} argument
* @return {@code false} if the waiting time detectably elapsed
* before return from the method, else {@code true}
* @throws InterruptedException if the current thread is interrupted
* (and interruption of thread suspension is supported)
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;

/**
* 使当前线程等待,直到发出信号或被中断,或者指定超过了指定的期限后等待。
*
* @param deadline the absolute time to wait until
* @return {@code false} if the deadline has elapsed upon return, else
* {@code true}
* @throws InterruptedException if the current thread is interrupted
* (and interruption of thread suspension is supported)
*/
boolean awaitUntil(Date deadline) throws InterruptedException;

/**
* 唤醒一个等待线程。随机唤醒,重新获锁。
*/
void signal();

/**
* 唤醒所有等待的线程。全部唤醒。
*/
void signalAll();
}

3.2.2 ConditionObject类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
/**
* AbstractQueuedSynchronizer的Condition实现,用作于Lock实现的基础。
* Condition implementation for a {@link
* AbstractQueuedSynchronizer} serving as the basis of a {@link
* Lock} implementation.
* 此类是可序列化的,但是所有字段都是瞬态的,因此反序列化条件没有等待的节点。
*
*/
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. 条件队列的第一个节点。 */
private transient Node firstWaiter;
/** Last node of condition queue. 条件队列的最后一个节点。*/
private transient Node lastWaiter;

/**
* 空参构造:创建一个新的ConditionObject实例。
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }

// Internal methods 内部方法

/**
* Adds a new waiter to wait queue. 添加新的等待者以等待队列。
* @return its new wait node 它的新的等待节点
*/
private Node addConditionWaiter() {
// 获取最后一个等待节点
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out. 如果lastWaiter被取消,请清除。
// 首先判断lastWaiter节点是否为空,或者是否是处于条件等待,如果不是的话则把它从等待队列中删除。
if (t != null && t.waitStatus != Node.CONDITION) {
// 剔除CONDITION节点进行重新链接
unlinkCancelledWaiters();
t = lastWaiter;
}
// 构建当前线程的条件等待节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
// 等待尾节点为null,此等待队列为null,把当前线程构建的条件节点加入到等待队列中去,初始化队列,此时首尾相等的队列
firstWaiter = node;
else
// 等待尾节点不为null,,把当前线程构建的条件节点加入到等待队列中去
t.nextWaiter = node;
// 设置尾结点
lastWaiter = node;
// 当前线程的条件结点
return node;
}

/**
* 从首节点向后遍历直到遇见一个非Cancelled或者为null的节点,
* 并将其移除等待队列,并添加到AQS的FIFO同步队列尾部
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue 非空条件队列上的第一个节点
*/
private void doSignal(Node first) {
do {
// 若first头节点的nex等待节点为null,尾结点就是null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 头节点设置为null
first.nextWaiter = null;
} while (!transferForSignal(first) && //从first开始遍历等待队列,把第一个非空、没取消的节点transfer到同步队列
(first = firstWaiter) != null);
}

/**
* 删除并转换所有节点。
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue 非空条件队列上的第一个节点
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}

/**
* 等待队列中为取消状态的等待节点进行取消链接
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

// public methods

/**
* 将等待时间最长的线程(如果存在)从该条件的等待队列移至拥有锁的等待队列。
* 首先会判断当前线程是不是独占的持有锁,然后唤醒等待队列中的第一个等待线程。
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
* 如果调用isHeldExclusively返回false,说明不是处于独占模式则抛出 IllegalMonitorStateException
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
// 头节点不为null,再doSignal
if (first != null)
doSignal(first);
}

/**
* 将所有线程从这种情况下的等待队列移至拥有锁的等待队列。
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}

/**
* 实现不间断的条件等待。
* Implements uninterruptible condition wait.
* <ol>
* 保存lock state,并且通过getState()返回
* <li> Save lock state returned by {@link #getState}.
* 以保存的state作为参数调用release(),如果失败则抛出IllegalMonitorStateException。
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* 一直阻塞直到被唤醒
* <li> Block until signalled.
* 通过调用acquire()以保存的state作为参数来重新获取锁。
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
*/
public final void awaitUninterruptibly() {
// 将当前线程作为条件节点添加到等待队列中
Node node = addConditionWaiter();
// 以当前节点的state去释放
int savedState = fullyRelease(node);
// 中断标记变量,默认为false
boolean interrupted = false;
// 如果当前节点node不在同步队列中进行循环
while (!isOnSyncQueue(node)) {
// 阻塞当前线程
LockSupport.park(this);
// 获取线程中断标志位
if (Thread.interrupted())
interrupted = true;
}
// 若线程在等待时候被中断或者本身为中断状态,那就当前线程自己去中断
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}

/*
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire.
*/

/** Mode meaning to reinterrupt on exit from wait 意味着在退出等待时重新中断 */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait 在等待退出时抛出InterruptedException */
private static final int THROW_IE = -1;

/**
* 检查中断,如果被中断,则返回THROW_IE。
* 如果在信号通知之前被中断,则返回REINTERRUPT(如果在信号通知之后),或者如果未中断则返回0.
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

/**
* 抛出InterruptedException,重新中断当前线程,或不执行任何操作。
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
// 为THROW_IE Mode时,抛出InterruptedException
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
// 为REINTERRUPT时,线程自身中断
selfInterrupt();
}

/**
* 实现可中断条件等待。
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
// 如果当前线程被中断,则抛出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程作为条件等待节点并添加到等待队列中
Node node = addConditionWaiter();
// 释放锁,并返回state状态
int savedState = fullyRelease(node);
// 未中断标志位
int interruptMode = 0;
// 若当前节点不在AQS同步队列中,循环
while (!isOnSyncQueue(node)) {
// 阻塞当前节点
LockSupport.park(this);
// 检查是否中断过,如果中断过,结束循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 若线程在等待时候被中断并且中断模式不是THROW_IE不可中断模式,设置中断模式为REINTERRUPT可重新中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled 若node的下一个等待节点不为null,解除取消节点的链接
unlinkCancelledWaiters();
if (interruptMode != 0) // 如果有过中断,重新中断
reportInterruptAfterWait(interruptMode);
}

/**
* 执行定时条件等待。
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}

/**
* 实现绝对定时条件等待。
* Implements absolute timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

/**
* 实现定时条件等待。可指定时间单位
* Implements timed condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled, interrupted, or timed out.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* <li> If timed out while blocked in step 4, return false, else true.
* </ol>
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}

// support for instrumentation

/**
* 如果此条件是由给定的同步对象创建的,则返回true。
* Returns true if this condition was created by the given
* synchronization object.
*
* @return {@code true} if owned
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}

/**
* 查询在这个等待条件对象上是否含有等待的线程。
* Queries whether any threads are waiting on this condition.
* Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
*
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}

/**
* 获取等待队列的大概长度(因为查询时没有加锁,并不能保证在查找时有没有其他新节点加入到队列)
* Returns an estimate of the number of threads waiting on
* this condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
*
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}

/**
* 返回一个包含可能正在等待此条件的线程的集合。
* Returns a collection containing those threads that may be
* waiting on this Condition.
* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
*
* @return the collection of threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}

4. AQS总结

4.1 AQS的核心思想与实现

AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

AQS是将每一条请求共享资源的线程封装成一个CLH锁(双向链表)队列的一个结点(Node),来实现锁的分配。

里面维护了用volatile修饰共享变量state(0未锁定,1锁定),线程通过CAS去改变状态,成功则获取锁成功,失败则进入等待队列,等待被唤醒。

AQS是自旋锁,在等待唤醒时候经常使用如下代码不停的尝试获取锁:

1
2
3
4
while (!tryAcquire(arg)) {
// enqueue thread if it is not already queued;
// possibly block current thread;
}

AQS定义了两种资源获取方式:独占(只有一个线程能访问执行,又根据是否按队列的顺序分为公平锁和非公平锁,如ReentrantLock) 和共享(多个线程可同时访问执行,如Semaphore/CountDownLatch,Semaphore、CountDownLatCh、 CyclicBarrier )。ReentrantReadWriteLock 可以看成是组合式,允许多个线程同时对某一资源进行读。

4.2 模板方法的抽象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 获取互斥锁
public final void acquire(int arg) {
// tryAcquire(arg)需要子类实现
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 获取互斥锁可中断
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquire(arg)需要子类实现
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// 获取共享锁
public final void acquireShared(int arg) {
// tryAcquireShared(arg)需要子类实现
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 获取共享锁可中断
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// tryAcquireShared(arg)需要子类实现
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 释放互斥锁
public final boolean release(int arg) {
// tryRelease(arg)需要子类实现
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 释放共享锁
public final boolean releaseShared(int arg) {
// tryReleaseShared(arg)需要子类实现
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

上述几个方法贯穿于ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch的源码解析中,AQS为我们打开了一扇知识大门。

5.参考资料

支付宝打赏 微信打赏

请作者喝杯咖啡吧