AbstractQueuedSynchronizer 原理分析 - 独占/共享模式

概念

AbstractQueuedSynchronizer (抽象队列同步器,简称 AQS),AQS 是很多同步器的基础框架,比如 ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 实现的。除此之外,我们还可以基于 AQS,定制出我们所需要的同步器。

原理

在 AQS 内部,通过维护一个FIFO 队列来管理多线程的排队工作。在公平竞争的情况下,无法获取同步状态的线程将会被封装成一个节点,置于队列尾部。入队的线程将会通过自旋的方式获取同步状态,若在有限次的尝试后,仍未获取成功,线程则会被阻塞住。大致示意图如下:

当头结点释放同步状态后,且后继节点对应的线程被阻塞,此时头结点线程将会去唤醒后继节点线程。后继节点线程恢复运行并获取同步状态后,会将旧的头结点从队列中移除,并将自己设为头结点。大致示意图如下:

常用方法

主要介绍三组重要的方法,通过使用这三组方法即可实现一个同步组件。
第一组方法是用于访问/设置同步状态的,如下:

方法说明
int getState()设置同步状态
void setState()设置同步状态
boolean compareAndSetState(int expect, int update)通过 CAS 设置同步状态

第二组方需要由同步组件覆写。如下:

方法说明
boolean tryAcquire(int arg)独占式获取同步状态
boolean tryRelease(int arg)独占式释放同步状态
int tryAcquireShared(int arg)共享式获取同步状态
boolean tryReleaseShared(int arg)共享式私房同步状态
boolean isHeldExclusively()检测当前线程是否获取独占锁

第三组方法是一组模板方法,同步组件可直接调用。如下:

方法说明
void acquire(int arg)独占式获取同步状态,该方法将会调用 tryAcquire 尝试获取同步状态。获取成功则返回,获取失败,线程进入同步队列等待。
void acquireInterruptibly(int arg)响应中断版的 acquire
boolean tryAcquireNanos(int arg,long nanos)超时+响应中断版的 acquire
void acquireShared(int arg)共享式获取同步状态,同一时刻可能会有多个线程获得同步状态。比如读写锁的读锁就是就是调用这个方法获取同步状态的。
void acquireSharedInterruptibly(int arg)响应中断版的 acquireShared
boolean tryAcquireSharedNanos(int arg,long nanos)超时+响应中断版的 acquireShared
boolean release(int arg)独占式释放同步状态
boolean releaseShared(int arg)共享式释放同步状态

上面列举了一堆方法,看似繁杂。但稍微理一下,就会发现上面诸多方法无非就两大类:一类是独占式获取和释放独占状态,另一类是共享式获取和释放同步状态。

源码

线程队列节点结构

在并发的情况下,AQS 会将未获取同步状态的线程将会封装成节点,并将其放入同步队列尾部。同步队列中的节点除了要保存线程,还要保存等待状态。不管是独占式还是共享式,在获取状态失败时都会用到节点类。所以这里我们要先看一下节点类的实现,为后面的源码分析进行简单铺垫。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
static final class Node {

/** 共享类型节点,标记节点在共享模式下等待 */
static final Node SHARED = new Node();

/** 独占类型节点,标记节点在独占模式下等待 */
static final Node EXCLUSIVE = null;

/*
* 这里的状态,我的个人理解是,对下一个对象的操作行为
* 如果是CANCELLED 表示:当前线程已经释放锁
* 如果是0(默认状态) 表示:下一个线程正在运行中,不需要唤醒
* 如果是SIANAL、CONDITION、propagate 表示:线程正在等待中,需要前节点释放同步状态后,来唤醒
*/
/** 当前线程取消竞争 */
static final int CANCELLED = 1;

/**
* 后置节点等待被唤醒
*/
static final int SIGNAL = -1;

/** 条件等待。表明节点等待在 Condition 上 */
static final int CONDITION = -2;

/**
* 和共享有关
*/
static final int PROPAGATE = -3;

/**
* 等待状态,取值如下:
* SIGNAL,
* CANCELLED,
* CONDITION,
* PROPAGATE,
* 0
*
* 初始情况下,waitStatus = 0
*/
volatile int waitStatus;

/**
* 前驱节点
*/
volatile Node prev;

/**
* 后继节点
*/
volatile Node next;

/**
* 对应的线程
*/
volatile Thread thread;

/**
* 下一个等待节点,和condition 有关,用在 ConditionObject 中
*/
Node nextWaiter;

/**
* 判断节点是否是共享节点
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

// CLH队列的头结点,其不包含线程信息,head永远为null
private transient volatile Node head;

// CLH队列的尾节点,每次新加一个节点都会尾插到最后
private transient volatile Node tail;

// 当前锁被占据的次数,因为可以被一个线程重复占据,所以其值可以大于0
// 没有线程占据,其值就是0
private volatile int state;

// 当前运行的线程,也是占据锁的线程,注意和CLH中的线程无关
private transient Thread exclusiveOwnerThread;

/**
* 获取前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

/** addWaiter 方法会调用该构造方法 */
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}

/** Condition 中会用到此构造方法 */
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

独占模式

获取同步状态

独占式获取同步状态时通过 acquire 进行的,下面来分析一下该方法的源码。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
/**
* 该方法将会调用子类复写的 tryAcquire 方法获取同步状态,
* - 获取成功:直接返回
* - 获取失败:将线程封装在节点中,并将节点置于同步队列尾部,
* 通过自旋尝试获取同步状态。如果在有限次内仍无法获取同步状态,
* 该线程将会被 LockSupport.park 方法阻塞住,直到被前驱节点唤醒
* arg 是一个扩展字段,或者说是调用发的一个信号量
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

/** 向同步队列尾部添加一个节点 */
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试以快速方式将节点添加到队列尾部
// tail是当前队列的尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 使用cas(乐观锁), 保证队列的更新
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}

// 快速插入节点失败,调用 enq 方法,不停的尝试插入节点
// 初始化 或者 等待cas结果的一致
enq(node);
return node;
}

/**
* 通过 CAS + 自旋的方式插入节点到队尾
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 设置头结点,初始情况下,头结点是一个空节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
/*
* 将节点插入队列尾部。这里是先将新节点的前驱设为尾节点,之后在尝试将新节点设为尾节
* 点,最后再将原尾节点的后继节点指向新的尾节点。除了这种方式,我们还先设置尾节点,
* 之后再设置前驱和后继,即:
*
* if (compareAndSetTail(t, node)) {
* node.prev = t;
* t.next = node;
* }
*
* 但如果是这样做,会导致一个问题,即短时内,队列结构会遭到破坏。考虑这种情况,
* 某个线程在调用 compareAndSetTail(t, node)成功后,该线程被 CPU 切换了。此时
* 设置前驱和后继的代码还没带的及执行,但尾节点指针却设置成功,导致队列结构短时内会
* 出现如下情况:
*
* +------+ prev +-----+ +-----+
* head | | <---- | | | | tail
* | | ----> | | | |
* +------+ next +-----+ +-----+
*
* tail 节点完全脱离了队列,这样导致一些队列遍历代码出错。如果先设置
* 前驱,在设置尾节点。及时线程被切换,队列结构短时可能如下:
*
* +------+ prev +-----+ prev +-----+
* head | | <---- | | <---- | | tail
* | | ----> | | | |
* +------+ next +-----+ +-----+
*
* 这样并不会影响从后向前遍历,不会导致遍历逻辑出错。
*
* 参考:
* https://www.cnblogs.com/micrari/p/6937995.html
*/
// 这个代码细节写的very/very/very good, 很精妙
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

/**
* 同步队列中的线程在此方法中以循环尝试获取同步状态,在有限次的尝试后,
* 若仍未获取锁,线程将会被阻塞,直至被前驱节点的线程唤醒。
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 循环获取同步状态
for (;;) {
final Node p = node.predecessor();
/*
* 前驱节点如果是头结点,表明前驱节点已经获取了同步状态。前驱节点释放同步状态后,
* 在不出异常的情况下, tryAcquire(arg) 应返回 true。此时节点就成功获取了同
* 步状态,并将自己设为头节点,原头节点出队。
*/
if (p == head && tryAcquire(arg)) {
// 成功获取同步状态,设置自己为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}

/*
* 如果获取同步状态失败,则根据条件判断是否应该阻塞自己。
* 如果不阻塞,CPU 就会处于忙等状态,这样会浪费 CPU 资源
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
/*
* 如果在获取同步状态中出现异常,failed = true,cancelAcquire 方法会被执行。
* tryAcquire 需同步组件开发者覆写,难免不了会出现异常。
*/
if (failed)
cancelAcquire(node);
}
}

/** 设置头节点 */
private void setHead(Node node) {
// 仅有一个线程可以成功获取同步状态,所以这里不需要进行同步控制
head = node;
node.thread = null;
node.prev = null;
}

/**
* 该方法主要用途是,当线程在获取同步状态失败时,根据前驱节点的等待状态,决定后续的动作。比如前驱
* 节点等待状态为 SIGNAL,表明当前节点线程应该被阻塞住了。不能老是尝试,避免 CPU 忙等。
* —————————————————————————————————————————————————————————————————
* | 前驱节点等待状态 | 相应动作 |
* —————————————————————————————————————————————————————————————————
* | SIGNAL | 阻塞 |
* | CANCELLED | 向前遍历, 移除前面所有为该状态的节点 |
* | waitStatus < 0 | 将前驱节点状态设为 SIGNAL, 并再次尝试获取同步状态 |
* —————————————————————————————————————————————————————————————————
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
/*
* 前驱节点等待状态为 SIGNAL,表示当前线程应该被阻塞。
* 线程阻塞后,会在前驱节点释放同步状态后被前驱节点线程唤醒
*/
if (ws == Node.SIGNAL)
return true;

/*
* 前驱节点等待状态为 CANCELLED,则以前驱节点为起点向前遍历,
* 移除其他等待状态为 CANCELLED 的节点。
*/
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 等待状态为 0 或 PROPAGATE,设置前驱节点等待状态为 SIGNAL,
* 并再次尝试获取同步状态。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private final boolean parkAndCheckInterrupt() {
// 调用 LockSupport.park 阻塞自己
LockSupport.park(this);
return Thread.interrupted();
}

/**
* 取消获取同步状态
*/
private void cancelAcquire(Node node) {
if (node == null)
return;

node.thread = null;

// 前驱节点等待状态为 CANCELLED,则向前遍历并移除其他为该状态的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// 记录 pred 的后继节点,后面会用到
Node predNext = pred.next;

// 将当前节点等待状态设为 CANCELLED
node.waitStatus = Node.CANCELLED;

/*
* 如果当前节点是尾节点,则通过 CAS 设置前驱节点 prev 为尾节点。设置成功后,再利用 CAS 将
* prev 的 next 引用置空,断开与后继节点的联系,完成清理工作。
*/
if (node == tail && compareAndSetTail(node, pred)) {
/*
* 执行到这里,表明 pred 节点被成功设为了尾节点,这里通过 CAS 将 pred 节点的后继节点
* 设为 null。注意这里的 CAS 即使失败了,也没关系。失败了,表明 pred 的后继节点更新
* 了。pred 此时已经是尾节点了,若后继节点被更新,则是有新节点入队了。这种情况下,CAS
* 会失败,但失败不会影响同步队列的结构。
*/
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 根据条件判断是唤醒后继节点,还是将前驱节点和后继节点连接到一起
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {

Node next = node.next;
if (next != null && next.waitStatus <= 0)
/*
* 这里使用 CAS 设置 pred 的 next,表明多个线程同时在取消,这里存在竞争。
* 不过此处没针对 compareAndSetNext 方法失败后做一些处理,表明即使失败了也
* 没关系。实际上,多个线程同时设置 pred 的 next 引用时,只要有一个能设置成
* 功即可。
*/
compareAndSetNext(pred, predNext, next);
} else {
/*
* 唤醒后继节点对应的线程。这里简单讲一下为什么要唤醒后继线程,考虑下面一种情况:
* head node1 node2 tail
* ws=0 ws=1 ws=-1 ws=0
* +------+ prev +-----+ prev +-----+ prev +-----+
* | | <---- | | <---- | | <---- | |
* | | ----> | | ----> | | ----> | |
* +------+ next +-----+ next +-----+ next +-----+
*
* 头结点初始状态为 0,node1、node2 和 tail 节点依次入队。node1 自旋过程中调用
* tryAcquire 出现异常,进入 cancelAcquire。head 节点此时等待状态仍然是 0,它
* 会认为后继节点还在运行中,所它在释放同步状态后,不会去唤醒后继等待状态为非取消的
* 节点 node2。如果 node1 再不唤醒 node2 的线程,该线程面临无法被唤醒的情况。此
* 时,整个同步队列就回全部阻塞住。
*/
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
/*
* 通过 CAS 将等待状态设为 0,让后继节点线程多一次
* 尝试获取同步状态的机会
*/
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
/*
* 这里如果 s == null 处理,是不是表明 node 是尾节点?答案是不一定。原因之前在分析
* enq 方法时说过。这里再啰嗦一遍,新节点入队时,队列瞬时结构可能如下:
* node1 node2
* +------+ prev +-----+ prev +-----+
* head | | <---- | | <---- | | tail
* | | ----> | | | |
* +------+ next +-----+ +-----+
*
* node2 节点为新入队节点,此时 tail 已经指向了它,但 node1 后继引用还未设置。
* 这里 node1 就是 node 参数,s = node1.next = null,但此时 node1 并不是尾
* 节点。所以这里不能从前向后遍历同步队列,应该从后向前。
*/
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒 node 的后继节点线程
LockSupport.unpark(s.thread);
}

到这里,独占式获取同步状态的分析就讲完了。如果仅分析获取同步状态的大致流程,那么这个流程并不难。但若深入到细节之中,还是需要思考思考。这里对独占式获取同步状态的大致流程做个总结,如下:

  • 调用 tryAcquire 方法尝试获取同步状态
  • 获取成功,直接返回
  • 获取失败,将线程封装到节点中,并将节点入队
  • 入队节点在 acquireQueued 方法中自旋获取同步状态
  • 若节点的前驱节点是头节点,则再次调用 tryAcquire 尝试获取同步状态
  • 获取成功,当前节点将自己设为头节点并返回
  • 获取失败,可能再次尝试,也可能会被阻塞。这里简单认为会被阻塞。

上面的步骤对应下面的流程图:

释放同步状态

相对于获取同步状态,释放同步状态的过程则要简单的多,这里简单罗列一下步骤:

  • 调用 tryRelease(arg) 尝试释放同步状态
  • 根据条件判断是否应该唤醒后继线程

就两个步骤,下面看一下源码分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
/*
* 这里简单列举条件分支的可能性,如下:
* 1. head = null
* head 还未初始化。初始情况下,head = null,当第一个节点入队后,head 会被初始
* 为一个虚拟(dummy)节点。这里,如果还没节点入队就调用 release 释放同步状态,
* 就会出现 h = null 的情况。
*
* 2. head != null && waitStatus = 0
* 表明后继节点对应的线程仍在运行中,不需要唤醒
*
* 3. head != null && waitStatus < 0
* 后继节点对应的线程可能被阻塞了,需要唤醒
*/
if (h != null && h.waitStatus != 0)
// 唤醒后继节点,上面分析过了,这里不再赘述
unparkSuccessor(h);
return true;
}
return false;
}

共享模式

与独占模式不同,共享模式下,同一时刻会有多个线程获取共享同步状态。共享模式是实现读写锁中的读锁、CountDownLatch 和 Semaphore 等同步组件的基础,搞懂了,再去理解一些共享同步组件就不难了。

获取同步状态

共享类型的节点获取共享同步状态后,如果后继节点也是共享类型节点,当前节点则会唤醒后继节点。这样,多个节点线程即可同时获取共享同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public final void acquireShared(int arg) {
// 尝试获取共享同步状态,tryAcquireShared 返回的是整型
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// 这里和前面一样,也是通过有限次自旋的方式获取同步状态
for (;;) {
final Node p = node.predecessor();
/*
* 前驱是头结点,其类型可能是 EXCLUSIVE,也可能是 SHARED.
* 如果是 EXCLUSIVE,线程无法获取共享同步状态。
* 如果是 SHARED,线程则可获取共享同步状态。
* 能不能获取共享同步状态要看 tryAcquireShared 具体的实现。比如多个线程竞争读写
* 锁的中的读锁时,均能成功获取读锁。但多个线程同时竞争信号量时,可能就会有一部分线
* 程因无法竞争到信号量资源而阻塞。
*/
if (p == head) {
// 尝试获取共享同步状态
// 获取失败:负数
// 成功:0,但是后续节点无法获取同步状态
// 成功:>0 , 同时后续节点可以获取同步状态,但是后续线程必须检查状态的可用性
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置头结点,如果后继节点是共享类型,唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

/**
* 这个方法做了两件事情:
* 1. 设置自身为头结点
* 2. 根据条件判断是否要唤醒后继节点
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 设置头结点
setHead(node);

/*
* 这个条件分支由 propagate > 0 和 h.waitStatus < 0 两部分组成。
* h.waitStatus < 0 时,waitStatus = SIGNAL 或 PROPAGATE。这里仅依赖
* 条件 propagate > 0 判断是否唤醒后继节点是不充分的,至于原因请参考下面
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
/*
* 节点 s 如果是共享类型节点,则应该唤醒该节点
* 至于 s == null 的情况前面分析过,这里不在赘述。
*/
if (s == null || s.isShared())
doReleaseShared();
}
}

/**
* 该方法用于在 acquires/releases 存在竞争的情况下,确保唤醒动作向后传播。
*/
private void doReleaseShared() {
/*
* 下面的循环在 head 节点存在后继节点的情况下,做了两件事情:
* 1. 如果 head 节点等待状态为 SIGNAL,则将 head 节点状态设为 0,并唤醒后继节点
* 2. 如果 head 节点等待状态为 0,则将 head 节点状态设为 PROPAGATE,保证唤醒能够正
* 常传播下去。关于 PROPAGATE 状态的细节分析,后面会讲到。
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
/*
* ws = 0 的情况下,这里要尝试将状态从 0 设为 PROPAGATE,保证唤醒向后
* 传播。setHeadAndPropagate 在读到 h.waitStatus < 0 时,可以继续唤醒
* 后面的节点。
*/
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

到这里,共享模式下获取同步状态的逻辑就分析完了,不过我这里只做了简单分析。相对于独占式获取同步状态,共享式的情况更为复杂。独占模式下,只有一个节点线程可以成功获取同步状态,也只有获取已同步状态节点线程才可以释放同步状态。但在共享模式下,多个共享节点线程可以同时获得同步状态,在一些线程获取同步状态的同时,可能还会有另外一些线程正在释放同步状态。所以,共享模式更为复杂。这里我的脑力跟不上了,没法面面俱到的分析。
最后说一下共享模式下获取同步状态的大致流程,如下:

  • 获取共享同步状态
  • 若获取失败,则生成节点,并入队
  • 如果前驱为头结点,再次尝试获取共享同步状态
  • 获取成功则将自己设为头结点,如果后继节点是共享类型的,则唤醒
  • 若失败,将节点状态设为 SIGNAL,再次尝试。若再次失败,线程进入等待状态

释放同步状态

释放共享状态主要逻辑在 doReleaseShared 中,doReleaseShared 上节已经分析过,这里就不赘述了。共享节点线程在获取同步状态和释放同步状态时都会调用 doReleaseShared,所以 doReleaseShared 是多线程竞争集中的地方。

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

PROPAGATE 状态存在的意义

AQS 的节点有几种不同的状态,PROPAGATE 字面意义,即向后传播唤醒动作。
那么就有两个问题:

  • PROPAGATE 状态用在哪里,以及怎样向后传播唤醒动作的?
  • 引入 PROPAGATE 状态是为了解决什么问题?

利用 PROPAGATE 传播唤醒动作

PROPAGATE 状态是用来传播唤醒动作的,那么它是在哪里进行传播的呢?答案是在setHeadAndPropagate方法中,这里再来看看 setHeadAndPropagate 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);

if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

注意看 setHeadAndPropagate 方法中那个长长的判断语句,其中有一个条件是h.waitStatus < 0,当 h.waitStatus = SIGNAL(-1) 或 PROPAGATE(-3) 是,这个条件就会成立。那么 PROPAGATE 状态是在何时被设置的呢?答案是在doReleaseShared方法中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {...}

// 如果 ws = 0,则将 h 状态设为 PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
...
}
}

再回到 setHeadAndPropagate 的实现,该方法既然引入了h.waitStatus < 0这个条件,就意味着仅靠条件propagate > 0判断是否唤醒后继节点线程的机制是不充分的。为啥?

引入 PROPAGATE 所解决的问题

PROPAGATE 的引入是为了解决一个 BUG – JDK-6801020,复现这个 BUG 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.Semaphore;

public class TestSemaphore {

private static Semaphore sem = new Semaphore(0);

private static class Thread1 extends Thread {
@Override
public void run() {
sem.acquireUninterruptibly();
}
}

private static class Thread2 extends Thread {
@Override
public void run() {
sem.release();
}
}

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10000000; i++) {
Thread t1 = new Thread1();
Thread t2 = new Thread1();
Thread t3 = new Thread2();
Thread t4 = new Thread2();
t1.start();
t2.start();
t3.start();
t4.start();
t1.join();
t2.join();
t3.join();
t4.join();
System.out.println(i);
}
}
}

根据 BUG 的描述消息可知 JDK 6u11,6u17 两个版本受到影响。那么,接下来再来看看引起这个 BUG 的代码 – JDK 6u17 中 setHeadAndPropagate 和 releaseShared 两个方法源码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
/*
* Don't bother fully figuring out successor. If it
* looks null, call unparkSuccessor anyway to be safe.
*/
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}

// 和 release 方法的源码基本一样
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

下面来简单说明 TestSemaphore 这个类的逻辑。这个类持有一个数值为 0 的信号量对象,并创建了4个线程,线程 t1 和 t2 用于获取信号量,t3 和 t4 则是调用 release() 方法释放信号量。在一般情况下,TestSemaphore 这个类的代码都可以正常执行。但当有极端情况出现时,可能会导致同步队列挂掉。这里演绎一下这个极端情况,考虑某次循环时,队列结构如下:

  • 时刻1:线程 t3 调用 unparkSuccessor 方法,head 节点状态由 SIGNAL(-1) 变为0,并唤醒线程 t1。此时信号量数值为1。
  • 时刻2:线程 t1 恢复运行,t1 调用 Semaphore.NonfairSync 的 tryAcquireShared,返回0。然后线程 t1 被切换,暂停运行。
  • 时刻3:线程 t4 调用 releaseShared 方法,因 head 的状态为0,所以 t4 不会调用 unparkSuccessor 方法。
  • 时刻4:线程 t1 恢复运行,t1 成功获取信号量,调用 setHeadAndPropagate。但因为 propagate = 0,线程 t1 无法调用 unparkSuccessor 唤醒线程 t2,t2 面临无线程唤醒的情况。因为 t2 无法退出等待状态,所以 t2.join 会阻塞主线程,导致程序挂住。

下面再来看一下修复 BUG 后的代码,根据 BUG 详情页显示,该 BUG 在 JDK 1.7 中被修复。这里找一个 JDK 7 较早版本(JDK 7u10)的代码看一下,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);

if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

在按照上面的代码演绎一下逻辑,如下:

  • 时刻1:线程 t3 调用 unparkSuccessor 方法,head 节点状态由 SIGNAL(-1) 变为0,并唤醒线程t1。此时信号量数值为1。
  • 时刻2:线程 t1 恢复运行,t1 调用 Semaphore.NonfairSync 的 tryAcquireShared,返回0。然后线程 t1 被切换,暂停运行。
  • 时刻3:线程 t4 调用 releaseShared 方法,检测到h.waitStatus = 0,t4 将头节点等待状态由0设为PROPAGATE(-3)。
  • 时刻4:线程 t1 恢复运行,t1 成功获取信号量,调用 setHeadAndPropagate。因 propagate = 0,propagate > 0 条件不满足。而 h.waitStatus = PROPAGATE(-3),所以条件h.waitStatus < 0成立。进而,线程 t1 可以唤醒线程 t2,完成唤醒动作的传播。

PROPAGATE 状态用在哪里,以及怎样向后传播唤醒动作的?

PROPAGATE 状态用在 setHeadAndPropagate。当头节点状态被设为 PROPAGATE 后,后继节点成为新的头结点后。若 propagate > 0 条件不成立,则根据条件h.waitStatus < 0成立与否,来决定是否唤醒后继节点,即向后传播唤醒动作。

引入 PROPAGATE 状态是为了解决什么问题?

引入 PROPAGATE 状态是为了解决并发释放信号量所导致部分请求信号量的线程无法被唤醒的问题。

Jeff-Eric wechat