AQS 是一個同步框架,關於同步在作業系統(一)—— 程序同步 中對程序同步做了些概念性的介紹,我們瞭解到程序(執行緒同理,本文基於 JVM 講解,故下文只稱執行緒)同步的工具有很多:Mutex、Semaphore、Monitor。但是Mutex 和 Semaphore 作為低階通訊存在不少缺點,Monitor 機制解決了上面部分缺憾,但是仍然存在問題,AQS 的出現很好的解決了這些問題。
Monitor 解決了上述幾個問題,但在 HotSpot 中底層是基於 Mutex 做的執行緒同步,在 1.6 之前且還沒有進行優化,每次鎖競爭都需要經歷兩次上下文切換嚴重影響效能。第二個問題是 HotSpot 實現的是精簡的 Mesa 語意,不支援多個條件變數。
AQS 即一個類,java.util.concurrent.AbstractQueuedSynchronized.Class,這個類作為一個構造同步器的框架。AQS 本身並不提供 API 給程式設計師用於直接的同步控制,它是一個抽象類,通過實現它的抽象方法來構建同步工具,如 ReentrantLock、CountDownLatch 等。也就是說它不是一個面向業務開發使用的工具,是構建同步器所複用的一套機制抽取出來形成的框架,這個框架我們自己也可以通過實現它的抽象方法來構建自己的同步器。
其實通過觀察同步器的各種實現都是相似的,我們會發現鎖的實現通常需要以下三要素:
狀態管理在號誌中是整型值,在 Mutex 中就是 Owner,在 Synchronized (Monitor) 中也是 Owner,這些機制都有對應的操作來加鎖和解鎖,通常加鎖/解鎖操作也對應著執行緒的阻塞(等待)/喚醒,這些執行緒沒有獲取到鎖後需要等待,有的實現是自旋,有的實現是掛起。不論是 Synchronized 還是 AQS 都有等待佇列供未獲取到鎖的執行緒進入,直到鎖釋放。
同理,AQS 所做的主要的三件事也就是上面三件,只是每一項的實現細節可能不同,支援的功能更廣泛。
下文會先列出 AQS 的設計,也就是實現了哪些特性,接下來就會通過看原始碼和圖示來了解這些設計的具體實現。
獨佔模式即同一時刻只能有一個執行緒可以通過阻塞點,共用模式下可以同時有多個執行緒在執行。
以上都是 AQS 需要支援的功能,基於模板模式的設計,AQS 提供了以下方法供子類繼承重新實現:
關於同步器的實現思路,首先需要有一個狀態來標明鎖是否可用,在 AQS 的實現中,其維護了一個變數 state,這個變數使用 volatile 關鍵字進行標識,保證其線上程之間的可見性。加鎖解鎖操作簡化來看就是隻需要把這個狀態更改,且標明當前執行緒佔有鎖。在獨佔模式下,加鎖操作必須互斥,也就是在同一時刻只能有一個執行緒加鎖成功,AQS 使用 CAS 原子指令來保證 State 的互斥存取。在一個執行緒成功加鎖(改變鎖的狀態 State 的值,該值具體如何改變取決於同步工具如何實現)之後,其他執行緒嘗試 CAS 則會加鎖失敗,那麼加鎖失敗的執行緒該如何呢?這些執行緒可以自旋等待或者阻塞,AQS 提供 CLH 佇列將這些阻塞的執行緒管理起來,CLH 是一個先進先出的佇列,加鎖失敗的執行緒會被進入佇列阻塞。因此加鎖和解鎖操作並不僅僅是簡單的修改鎖狀態,在這之後還需要維護佇列。AQS 的主要原理也就是圍繞著佇列進行入,加鎖解鎖功能由繼承 AQS 的同步工具實現,在呼叫加鎖操作之後,AQS 來維護執行緒入隊,並且將執行緒阻塞,在呼叫解鎖操作後,AQS 將佇列中的執行緒按規則出隊並且喚醒。
static final class Node {
static final Node SHARED = new Node(); // 共用模式下的節點
static final Node EXCLUSIVE = null; // 獨佔模式下的節點
static final int CANCELLED = 1; // 被取消了的狀態
static final int SIGNAL = -1; // 處於park() 狀態等待被unpark()喚醒的狀態
static final int CONDITION = -2; // 處於等待 condition 的狀態
static final int PROPAGATE = -3; // 共用模式下需要繼續傳播的狀態
volatile int waitStatus; // 當前節點的狀態
volatile Node prev; // 前驅節點指標
volatile Node next; // 後繼節點指標
volatile Thread thread; // 搶佔鎖的執行緒
Node nextWaiter; // 指向下一個也處於等待的節點
}
上面提到 AQS 有一個阻塞佇列,這個佇列的具體實現是雙向連結串列,佇列中的節點是對執行緒進行封裝後的 Node 類,這個類的每個欄位含義如下:
為了更好的講解原理,下面直接模擬執行緒競爭鎖的場景來進行分析,我們首先看兩個執行緒在獨佔鎖模式下加鎖和解鎖的原始碼,再看共用模式下的場景。
場景:執行緒 T1 和執行緒 T2 競爭鎖
acquire(int arg) 是獨佔模式下加鎖的入口方法
public final void acquire(int arg) {
// tryAcquire(arg) 是繼承 AQS 實現在獨佔模式下的同步工具需要重寫的方法。當這個方法返回true,就表示當前執行緒獲得了鎖。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
流程如下:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
private Node addWaiter(Node mode) {
// 根據當前執行緒範例化一個獨佔式的 Node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
/**
* 1. 將尾部賦給一個臨時變數,注意此時tail為空,因為tail此時還沒有指向Node物件
* 2. tail == null,因此執行 enq(node) 方法,這個node是建立的當前執行緒的node
*/
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 這個CAS方法的含義是,判斷當前AQS的尾部節點是pred,如果是則重新設定為當前node
// 設定成功,則退出
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果掛載隊尾節點也存在競爭 則使用無限CAS自旋方式設定隊尾
enq(node);
return node;
}
對上述流程第4點拆解來看,T2 首先要執行addWaiter(Node.EXCLUSIVE)進行入隊,我們看下執行流程:
private Node enq(final Node node) {
// 死迴圈判斷
for (;;) {
/**
* 第一次迴圈:
* tail賦值給臨時變數t, 注意此時tail仍然是null, 進入if塊
* 呼叫compareAndSetHead(new Node()) 建立了一個新的節點 ,這個節點的結構如下:
* Node() { // Used to establish initial head or SHARED marker
* }
* 該節點中的執行緒為 NULL,也就是我們下文所稱的啞節點
* 此時佇列裡有了第一個Node
*/
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
/**
* 第二次迴圈:在第一次迴圈時head賦給了tail,此時tail 不為空,
* 進入else塊,將執行緒T1入隊,也就是維護連結串列關係
*/
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq() 算是上述整個流程的一個分支,為後續講解更清楚,有必要先詳細講解此方法,畫出入隊情形下的佇列圖,流程如下:
下圖是 T2 入隊成功後佇列圖:
對上述流程第4點進行拆解,執行完畢 addWaiter() 返回當前節點 T2 Node,接著就進入acquireQueued 方法,我們來看該方法的流程:
final boolean acquireQueued(final Node node, int arg) {
// 標記自己是否拿到了資源 true為沒有獲取到。
boolean failed = true;
try {
// 標記等待過程中是否被中斷過
boolean interrupted = false;
for (;;) {
// 拿到前驅節點
final Node p = node.predecessor();
// 判斷上一個節點是否是頭部,即當前節點為佇列中第二個節點,tryAcquire()嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
// 將自己設定為頭結點
setHead(node);
// 斷開原先的啞節點與當前節點的next連線
p.next = null; // help GC
// 標記已經獲取到節點
failed = false;
// 返回執行緒中斷標記
return interrupted;
}
/**
* 判斷在一次自旋加鎖失敗後是否需要睡眠
* 自旋第一次時shouldParkAfterFailedAcquire(p, node)返回false, 不會進入if塊
* 自旋第二次時,shouldParkAfterFailedAcquire(p, node)返回true,
* 此時進入parkAndCheckInterrupt()方法,此時執行緒阻塞在 parkAndCheckInterrupt()
* 如果執行緒休息過程中被中斷過(Thread.interrupted();), interrupted 返回true
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
該方法有兩個標記,一個是 failed 標記是否拿到了資源,注意預設為 true 的時候是沒有拿到,另一個是中斷標記 interrupted 預設為 false, 該標記的含義不是該執行緒是否已經被打斷過,而是是否需要被打斷。我們先看 for 迴圈中的邏輯再看 finally 塊。
private void setHead(Node node) {
head = node;
// 當前節點的執行緒欄位置為空,也就是這個節點替代了原先的啞節點變成了新的啞節點
node.thread = null;
// 斷開與原啞節點的prev連線
node.prev = null;
}
/**
* 檢查當前節點並更新狀態,返回是否需要被阻塞
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status 前置節點
* @param node the node 當前節點
* @return {@code true} if thread should block 是否需要被阻塞
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前置節點的狀態,ws預設值為0
int ws = pred.waitStatus;
// SIGNAL預設值為1
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 前置節點處於取消狀態
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 如果前置節點不是正常的等待狀態那麼就繼續往前找直到找到一個正在等待裝填的節點。將其後置節點斷開接上當前節點。GC會回收一堆相互參照又沒有外部參照的節點。
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
接上述流程,T2 執行緒獲取資源失敗,因而走此方法邏輯,此時傳進該方法的引數 p 是 T2 Node 的前置節點,也就是啞節點(注意因沒獲取到資源沒有進入if塊,啞節點還是最先建立的那個啞節點)。這個方法就是線上程沒有獲取資源的情況下不斷自旋直到進入阻塞狀態或者是加鎖成功,shouldParkAfterFailedAcquire 就是返回一個布林值判斷當前執行緒是否需要進行阻塞。
流程如下:
private final boolean parkAndCheckInterrupt() {
// 執行緒阻塞在這,被打斷後才會往下執行, 此時打斷標記為 true
LockSupport.park(this);
// 返回打斷標記並重置為false
return Thread.interrupted();
}
這個方法比較簡單,就是在上一個方法判斷要阻塞之後呼叫這個方法,通過執行 park() 方法將執行緒阻塞在此處,直到有另一個執行緒將其喚醒。喚醒的方式有兩種,一是呼叫 unpark() 方法,二是使用 interrupt() 方法置中斷標記為 true。T2 Node 阻塞在此處被喚醒後就繼續往下走執行 Thread.interrupted(), 該方法會重置中斷標記並且給出是否被中斷,因此此方法最終會返回一個 bool 值,表明該執行緒是否需要被打斷。
T2 Node 執行到此處則一直阻塞在這了,為了講清楚這些細節,我們按實際場景講述,這個執行緒就阻塞在這了, T2 Node 被喚醒必然是另一個執行緒對其執行了 unpark() 或者 interrupt() 方法,我們先來看呼叫 unpark() 的情況,也就是執行緒 T1 解鎖的流程。
書接上回,T2 阻塞住了,此時 T1 執行完畢臨界區,開始進行解鎖,解鎖操作首先是恢復鎖狀態 State 為可用,其中是否重入等細節由繼承AQS 的同步工具實現(待講 ReentrantLock 再講此處細節),此處略過不表,我們重點關注 AQS 的鎖釋放原理,解鎖過程(獨佔模式下)最終會呼叫到 AQS 的 release(int arg) 方法,流程如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 頭結點存在 且頭結點狀態不為0
// 如果頭結點不為初始化狀態則喚醒佇列下一個等待的執行緒
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
* 獲取當前節點狀態
*/
int ws = node.waitStatus;
// 該節點狀態是有效的 就將其設定為初始化狀態
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* 從尾節點往前找, 找到一個waitStatus 小於0 的Node
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
此時流程如下:
上面的解鎖流程的最後,T2 執行緒被喚醒,繼續往下執行, 注意這裡我看其他部落格說的有點問題,在被 unpark 喚醒的情況下是不會修改中斷標記的,因此這裡返回的是 false,所以回到 acquireQueued 方法中並不會執行下面這行程式碼,關於 unpark() 和 interrupt() 我單獨開篇部落格講下。
T2 執行緒被喚醒後就開始有了競爭鎖的資格,這時候 for 迴圈又開始執行,即再次競爭鎖,假如這次加鎖成功(加鎖失敗就又繼續阻塞,咱就沒必要繼續講了,為了閉環我們假設這次加鎖成功),那麼返回中斷標記 interrupted,這個值為 false。
此時也就完成了鎖的更替 由 T1 變成了 T2,整個環節就閉環了,在這上面的講解過程中沒有引入更多的執行緒是為了避免討論複雜化,在 T3、T4 甚至更多執行緒來臨時同樣是遵循的上述程式碼流程,只是因為多執行緒並行,這其中哪個執行緒能獲取到鎖、連結串列的維護是否正確,喚醒的時候喚醒的是哪個執行緒這些事並不可控,因而也沒有必要一個個場景來複現,只要弄清楚原理想必足夠了。
研讀原始碼過程中發現所需時間耗費過長,在這個以敏捷為王的時代,偶爾也疑慮如此費勁周章地研習原始碼能有何收穫,成本與收穫似不成正比。故而 AQS 的原始碼暫且看到這,想必知曉原理已足夠,後續若有閒思再補充共用式加鎖和解鎖流程細節。