AQS定義了一套多執行緒存取共用資源的同步器框架,許多同步類實現都依賴於它,如常用的ReentrantLock。
簡單來說,AQS定義了一套框架,來實現同步類。
AQS的核心思想是對於共用資源,維護一個雙端佇列來管理執行緒,佇列中的執行緒依次獲取資源,獲取不到的執行緒進入佇列等待,直到資源釋放,佇列中的執行緒依次獲取資源。
AQS的基本框架如圖所示:
state變數表示共用資源,通常是int
型別。
getState()
、setState()
、compareAndSetState()
等。CLH佇列是一種基於邏輯佇列非執行緒飢餓的自旋公平鎖,具體介紹可參考此篇部落格。CLH中每個節點都表示一個執行緒,處於頭部的節點獲取資源,而其他資源則等待。
Node
類原始碼如下所示:static final class Node {
// 模式,分為共用與獨佔
// 共用模式
static final Node SHARED = new Node();
// 獨佔模式
static final Node EXCLUSIVE = null;
// 結點狀態
// CANCELLED,值為1,表示當前的執行緒被取消
// SIGNAL,值為-1,表示當前節點的後繼節點包含的執行緒需要執行,也就是unpark
// CONDITION,值為-2,表示當前節點在等待condition,也就是在condition佇列中
// PROPAGATE,值為-3,表示當前場景下後續的acquireShared能夠得以執行
// 值為0,表示當前節點在sync佇列中,等待著獲取鎖
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 結點狀態
volatile int waitStatus;
// 前驅結點
volatile Node prev;
// 後繼結點
volatile Node next;
// 結點所對應的執行緒
volatile Thread thread;
// 下一個等待者
Node nextWaiter;
// 結點是否在共用模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 獲取前驅結點,若前驅結點為空,丟擲異常
final Node predecessor() throws NullPointerException {
// 儲存前驅結點
Node p = prev;
if (p == null) // 前驅結點為空,丟擲異常
throw new NullPointerException();
else // 前驅結點不為空,返回
return p;
}
// 無參構造方法
Node() { // Used to establish initial head or SHARED marker
}
// 構造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 構造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Node
的方法和屬性值如圖所示:
其中,
waitStatus
表示當前節點在佇列中的狀態;thread
表示當前節點表示的執行緒;prev
和next
分別表示當前節點的前驅節點和後繼節點;nextWaiter
d當存在CONDTION佇列時,表示一個condition狀態的後繼節點。1
-CANCELLED
,表示節點獲取鎖的請求被取消,此時節點不再請求資源;0
,是節點初始化的預設值;-1
-SIGNAL
,表示執行緒做好準備,等待資源釋放;-2
-CONDITION
,表示節點在condition等待佇列中,等待被喚醒而進入同步佇列;-3
-PROPAGATE
,當前執行緒處於共用模式下的時候會使用該欄位。AQS提供一系列結構,作為一個完整的模板,自定義的同步器只需要實現資源的獲取和釋放就可以,而不需要考慮底層的佇列修改、狀態改變等邏輯。
使用AQS實現一個自定義同步器,需要實現的方法:
isHeldExclusively()
:該執行緒是否獨佔資源,在使用到condition的時候會實現這一方法;tryAcquire(int)
:獨佔模式獲取資源的方式,成功獲取返回true
,否則返回false
;tryRelease(int)
:獨佔模式釋放資源的方式,成功獲取返回true
,否則返回false
;tryAcquireShared(int)
:共用模式獲取資源的方式,成功獲取返回true
,否則返回false
;tryReleaseShared(int)
:共用模式釋放資源的方式,成功獲取返回true
,否則返回false
;一般來說,一個同步器是資源獨佔模式或者資源共用模式的其中之一,因此tryAcquire(int)
和tryAcquireShared(int)
只需要實現一個即可,tryRelease(int)
和tryReleaseShared(int)
同理。
但是同步器也可以實現兩種模式的資源獲取和釋放,從而實現獨佔和共用兩種模式。
acquire(int)
是獲取原始碼部分的頂層入口,原始碼如下所示:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這段程式碼展現的資源獲取流程如下:
tryAcquire()
嘗試直接去獲取資源;獲取成功則直接返回addWaiter()
將該執行緒加入等待佇列的尾部,並標記為獨佔模式;acquireQueued()
使執行緒阻塞在等待佇列中獲取資源,一直獲取到資源後才返回。簡單總結就是:
從上文的描述可見重要的方法有三個:tryAquire()
、addWaiter()
、acquireQueued()
。下面將逐個分析其原始碼:
tryAcquire(int)
是獲取資源的方法,原始碼如下所示:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
該方法是一個空方法,需要自定義同步器實現,因此在使用AQS實現同步器時,需要重寫該方法。這也是「自定義的同步器只需要實現資源的獲取和釋放就可以」的體現。
addWaiter(Node.EXCLUSIVE)
是將執行緒加入等待佇列的尾部,原始碼如下所示:
private Node addWaiter(Node mode) {
//以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共用)
//aquire()方法是獨佔模式,因此直接使用Exclusive引數。
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速方式直接放到隊尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失敗則通過enq入隊。
enq(node);
return node;
}
首先,使用模式將當前執行緒構造為一個節點,然後嘗試將該節點放入隊尾,如果成功則返回,否則呼叫enq(node)
將節點放入隊尾,最終返回當前節點的位置指標。
其中,enq(node)
方法是將節點加入佇列的方法,原始碼如下所示:
private Node enq(final Node node) {
for (;;) { // 無限迴圈,確保結點能夠成功入佇列
// 儲存尾結點
Node t = tail;
if (t == null) { // 尾結點為空,即還沒被初始化
if (compareAndSetHead(new Node())) // 頭節點為空,並設定頭節點為新生成的結點
tail = head; // 頭節點與尾結點都指向同一個新生結點
} else { // 尾結點不為空,即已經被初始化過
// 將node結點的prev域連線到尾結點
node.prev = t;
if (compareAndSetTail(t, node)) { // 比較結點t是否為尾結點,若是則將尾結點設定為node
// 設定尾結點的next域為node
t.next = node;
return t; // 返回尾結點
}
}
}
}
這部分原始碼是將執行緒阻塞在等待佇列中,執行緒處於等待狀態,直到獲取到資源後才返回,原始碼如下所示:
// sync佇列中的結點在獨佔且忽略中斷的模式下獲取(資源)
final boolean acquireQueued(final Node node, int arg) {
// 標誌
boolean failed = true;
try {
// 中斷標誌
boolean interrupted = false;
for (;;) { // 無限迴圈
// 獲取node節點的前驅結點
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 前驅為頭節點並且成功獲得鎖
setHead(node); // 設定頭節點
p.next = null; // help GC
failed = false; // 設定標誌
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//
//shouldParkAfterFailedAcquire只有當該節點的前驅結點的狀態為SIGNAL時,才可以對該結點所封裝的執行緒進行park操作。否則,將不能進行park操作。
//parkAndCheckInterrupt首先執行park操作,即禁用當前執行緒,然後返回該執行緒是否已經被中斷
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued(Node node, int arg)
方法的主要邏輯如下:
node
節點的前驅結點,判斷前驅節點是不是頭部節點head,有沒有成功獲取資源。SIGNAL
,是的話則park當前節點,否則不執行park操作;park
當前節點之後,當前節點進入等待狀態,等待被其他節點unpark
操作喚醒。然後重複此邏輯步驟。release(int)
是釋放資源的頂層入口方法,原始碼如下所示:
public final boolean release(int arg) {
if (tryRelease(arg)) { // 釋放成功
// 儲存頭節點
Node h = head;
if (h != null && h.waitStatus != 0) // 頭節點不為空並且頭節點狀態不為0
unparkSuccessor(h); //釋放頭節點的後繼結點
return true;
}
return false;
}
release(int)
方法的主要邏輯如下:
true
,否則返回false
;unparkSuccessor(h)
喚醒後繼節點。下面介紹兩個重要的原始碼函數:tryRelease(int)
和unparkSuccessor(h)
。
tryRelease(int)
是釋放資源的方法,原始碼如下所示:
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
這部分是需要自定義同步器自己實現的,要注意的是返回值需要為boolean
型別,表示釋放資源是否成功。
unparkSuccessor(h)
是喚醒後繼節點的方法,原始碼如下所示:
private void unparkSuccessor(Node node) {
//這裡,node一般為當前執行緒所在的結點。
int ws = node.waitStatus;
if (ws < 0)//置零當前執行緒所在的結點狀態,允許失敗。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一個需要喚醒的結點s
if (s == null || s.waitStatus > 0) {//如果為空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 從後向前找。
if (t.waitStatus <= 0)//從這裡可以看出,<=0的結點,都是還有效的結點。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒
}
這部分主要是查詢第一個還處於等待狀態的節點,將其喚醒;
查詢順序是從後往前找,這是因為CLH佇列中的prev
鏈是強一致的,從後往前找更加安全,而next
鏈因為addWaiter()
方法和cancelAcquire()
方法的存在,不是強一致的,因此從前往後找可能會出現問題。這部分的具體解釋可以參考參考文獻-1
是使用共用模式獲取共用資源的頂層入口方法,原始碼如下所示:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
流程如下:
tryAcquireShared(arg)
嘗試獲取資源,如果獲取成功則直接返回;doAcquireShared(arg)
將執行緒阻塞在等待佇列中,直到被unpark()
/interrupt()
併成功獲取到資源才返回。其中,tryAcquireShared(arg)
是獲取共用資源的方法,也是需要使用者自己實現。
而doAcquireShared(arg)
是將執行緒阻塞在等待佇列中,直到獲取到資源後才返回,具體流程和acquireQueued()
方法類似,
原始碼如下所示:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//加入佇列尾部
boolean failed = true;//是否成功標誌
try {
boolean interrupted = false;//等待過程中是否被中斷過的標誌
for (;;) {
final Node p = node.predecessor();//前驅
if (p == head) {//如果到head的下一個,因為head是拿到資源的執行緒,此時node被喚醒,很可能是head用完資源來喚醒自己的
int r = tryAcquireShared(arg);//嘗試獲取資源
if (r >= 0) {//成功
setHeadAndPropagate(node, r);//將head指向自己,還有剩餘資源可以再喚醒之後的執行緒
p.next = null; // help GC
if (interrupted)//如果等待過程中被打斷過,此時將中斷補上。
selfInterrupt();
failed = false;
return;
}
}
//判斷狀態,尋找安全點,進入waiting狀態,等著被unpark()或interrupt()
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
releaseShared(int)
是釋放共用資源的頂層入口方法,原始碼如下所示:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//嘗試釋放資源
doReleaseShared();//喚醒後繼結點
return true;
}
return false;
}
流程如下:
tryReleaseShared(arg)
嘗試釋放資源,如果釋放成功則返回true,否則返回false;doReleaseShared()
喚醒後繼節點。下面介紹一下doReleaseShared()
方法,原始碼如下所示:
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//喚醒後繼
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)// head發生變化
break;
}
}
Q:AQS是介面嗎?有哪些沒有實現的方法?看過相關原始碼嗎?
AQS定義了一個實現同步類的框架,實現方法主要有tryAquire
和tryRelease
,表示獨佔模式的資源獲取和釋放,tryAquireShared
和tryReleaseShared
表示共用模式的資源獲取和釋放。
原始碼分析如上文所述。