【後端面經-Java】AQS詳解

2023-06-29 09:00:28

1. AQS是什麼?

AQS定義了一套多執行緒存取共用資源的同步器框架,許多同步類實現都依賴於它,如常用的ReentrantLock。
簡單來說,AQS定義了一套框架,來實現同步類

2. AQS核心思想

2.1 基本框架

AQS的核心思想是對於共用資源,維護一個雙端佇列來管理執行緒,佇列中的執行緒依次獲取資源,獲取不到的執行緒進入佇列等待,直到資源釋放,佇列中的執行緒依次獲取資源。
AQS的基本框架如圖所示:

2.1.1 資源state

state變數表示共用資源,通常是int型別。

  1. 存取方法
    state型別使用者無法直接進行修改,而需要藉助於AQS提供的方法進行修改,即getState()setState()compareAndSetState()等。
  2. 存取型別
    AQS定義了兩種資源存取型別:
    • 獨佔(Exclusive):一個時間點資源只能由一個執行緒佔用;
    • 共用(Share):一個時間點資源可以被多個執行緒共用。

2.1.2 CLH雙向佇列

CLH佇列是一種基於邏輯佇列非執行緒飢餓的自旋公平鎖,具體介紹可參考此篇部落格。CLH中每個節點都表示一個執行緒,處於頭部的節點獲取資源,而其他資源則等待。

  1. 節點結構
    Node類原始碼如下所示:
static final class Node {
    // 模式,分為共用與獨佔
    // 共用模式
    static final Node SHARED = new Node();
    // 獨佔模式
    static final Node EXCLUSIVE = null;        
    // 結點狀態
    // CANCELLED,值為1,表示當前的執行緒被取消
    // SIGNAL,值為-1,表示當前節點的後繼節點包含的執行緒需要執行,也就是unpark
    // CONDITION,值為-2,表示當前節點在等待condition,也就是在condition佇列中
    // PROPAGATE,值為-3,表示當前場景下後續的acquireShared能夠得以執行
    // 值為0,表示當前節點在sync佇列中,等待著獲取鎖
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;        

    // 結點狀態
    volatile int waitStatus;        
    // 前驅結點
    volatile Node prev;    
    // 後繼結點
    volatile Node next;        
    // 結點所對應的執行緒
    volatile Thread thread;        
    // 下一個等待者
    Node nextWaiter;
    
    // 結點是否在共用模式下等待
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    
    // 獲取前驅結點,若前驅結點為空,丟擲異常
    final Node predecessor() throws NullPointerException {
        // 儲存前驅結點
        Node p = prev; 
        if (p == null) // 前驅結點為空,丟擲異常
            throw new NullPointerException();
        else // 前驅結點不為空,返回
            return p;
    }
    
    // 無參構造方法
    Node() {    // Used to establish initial head or SHARED marker
    }
    
    // 構造方法
        Node(Thread thread, Node mode) {    // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    // 構造方法
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

Node的方法和屬性值如圖所示:

其中,

  • waitStatus表示當前節點在佇列中的狀態;
  • thread表示當前節點表示的執行緒;
  • prevnext分別表示當前節點的前驅節點和後繼節點;
  • nextWaiterd當存在CONDTION佇列時,表示一個condition狀態的後繼節點。
  1. waitStatus
    結點的等待狀態是一個整數值,具體的引數值和含義如下所示:
  • 1-CANCELLED,表示節點獲取鎖的請求被取消,此時節點不再請求資源;
  • 0,是節點初始化的預設值;
  • -1-SIGNAL,表示執行緒做好準備,等待資源釋放;
  • -2-CONDITION,表示節點在condition等待佇列中,等待被喚醒而進入同步佇列;
  • -3-PROPAGATE,當前執行緒處於共用模式下的時候會使用該欄位。

2.2 AQS模板

AQS提供一系列結構,作為一個完整的模板,自定義的同步器只需要實現資源的獲取和釋放就可以,而不需要考慮底層的佇列修改、狀態改變等邏輯。
使用AQS實現一個自定義同步器,需要實現的方法:

  • isHeldExclusively():該執行緒是否獨佔資源,在使用到condition的時候會實現這一方法;
  • tryAcquire(int):獨佔模式獲取資源的方式,成功獲取返回true,否則返回false;
  • tryRelease(int):獨佔模式釋放資源的方式,成功獲取返回true,否則返回false;
  • tryAcquireShared(int):共用模式獲取資源的方式,成功獲取返回true,否則返回false;
  • tryReleaseShared(int):共用模式釋放資源的方式,成功獲取返回true,否則返回false;

一般來說,一個同步器是資源獨佔模式或者資源共用模式的其中之一,因此tryAcquire(int)tryAcquireShared(int)只需要實現一個即可,tryRelease(int)tryReleaseShared(int)同理。
但是同步器也可以實現兩種模式的資源獲取和釋放,從而實現獨佔和共用兩種模式。

3. 原始碼分析

3.1 acquire(int)

acquire(int)是獲取原始碼部分的頂層入口,原始碼如下所示:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

這段程式碼展現的資源獲取流程如下:

  • tryAcquire()嘗試直接去獲取資源;獲取成功則直接返回
  • 如果獲取失敗,則addWaiter()將該執行緒加入等待佇列的尾部,並標記為獨佔模式;
  • acquireQueued()使執行緒阻塞在等待佇列中獲取資源,一直獲取到資源後才返回。

簡單總結就是:

  • 獲取資源;
  • 失敗就排隊;
  • 排隊要等待。

從上文的描述可見重要的方法有三個:tryAquire()addWaiter()acquireQueued()。下面將逐個分析其原始碼:

3.1.1 tryAcquire(int)

tryAcquire(int)是獲取資源的方法,原始碼如下所示:

protected boolean tryAcquire(int arg) {
      throw new UnsupportedOperationException();
}

該方法是一個空方法,需要自定義同步器實現,因此在使用AQS實現同步器時,需要重寫該方法。這也是「自定義的同步器只需要實現資源的獲取和釋放就可以」的體現。

3.1.2 addWaiter(Node.EXCLUSIVE)

addWaiter(Node.EXCLUSIVE)是將執行緒加入等待佇列的尾部,原始碼如下所示:

private Node addWaiter(Node mode) {
    //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共用)
    //aquire()方法是獨佔模式,因此直接使用Exclusive引數。
    Node node = new Node(Thread.currentThread(), mode);

    //嘗試快速方式直接放到隊尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    //上一步失敗則通過enq入隊。
    enq(node);
    return node;
}

首先,使用模式將當前執行緒構造為一個節點,然後嘗試將該節點放入隊尾,如果成功則返回,否則呼叫enq(node)將節點放入隊尾,最終返回當前節點的位置指標。
其中,enq(node)方法是將節點加入佇列的方法,原始碼如下所示:

private Node enq(final Node node) {
    for (;;) { // 無限迴圈,確保結點能夠成功入佇列
        // 儲存尾結點
        Node t = tail;
        if (t == null) { // 尾結點為空,即還沒被初始化
            if (compareAndSetHead(new Node())) // 頭節點為空,並設定頭節點為新生成的結點
                tail = head; // 頭節點與尾結點都指向同一個新生結點
        } else { // 尾結點不為空,即已經被初始化過
            // 將node結點的prev域連線到尾結點
            node.prev = t; 
            if (compareAndSetTail(t, node)) { // 比較結點t是否為尾結點,若是則將尾結點設定為node
                // 設定尾結點的next域為node
                t.next = node; 
                return t; // 返回尾結點
            }
        }
    }
}

3.1.3 acquireQueued(Node node, int arg)

這部分原始碼是將執行緒阻塞在等待佇列中,執行緒處於等待狀態,直到獲取到資源後才返回,原始碼如下所示:

// sync佇列中的結點在獨佔且忽略中斷的模式下獲取(資源)
final boolean acquireQueued(final Node node, int arg) {
    // 標誌
    boolean failed = true;
    try {
        // 中斷標誌
        boolean interrupted = false;
        for (;;) { // 無限迴圈
            // 獲取node節點的前驅結點
            final Node p = node.predecessor(); 
            if (p == head && tryAcquire(arg)) { // 前驅為頭節點並且成功獲得鎖
                setHead(node); // 設定頭節點
                p.next = null; // help GC
                failed = false; // 設定標誌
                return interrupted; 
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//
                //shouldParkAfterFailedAcquire只有當該節點的前驅結點的狀態為SIGNAL時,才可以對該結點所封裝的執行緒進行park操作。否則,將不能進行park操作。
                //parkAndCheckInterrupt首先執行park操作,即禁用當前執行緒,然後返回該執行緒是否已經被中斷
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued(Node node, int arg)方法的主要邏輯如下:

  • 獲取node節點的前驅結點,判斷前驅節點是不是頭部節點head,有沒有成功獲取資源。
  • 如果前驅結點是頭部節點head並且獲取了資源,說明自己應該被喚醒,設定該節點為head節點等待下一個獲得資源;
  • 如果前驅節點不是頭部節點或者沒有獲取資源,則判斷是否需要park當前執行緒,
    • 判斷前驅節點狀態是不是SIGNAL,是的話則park當前節點,否則不執行park操作;
  • park當前節點之後,當前節點進入等待狀態,等待被其他節點unpark操作喚醒。然後重複此邏輯步驟。

3.2 release(int)

release(int)是釋放資源的頂層入口方法,原始碼如下所示:

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 釋放成功
        // 儲存頭節點
        Node h = head; 
        if (h != null && h.waitStatus != 0) // 頭節點不為空並且頭節點狀態不為0
            unparkSuccessor(h); //釋放頭節點的後繼結點
        return true;
    }
    return false;
}

release(int)方法的主要邏輯如下:

  • 嘗試釋放資源,如果釋放成功則返回true,否則返回false
  • 釋放成功之後,需要呼叫unparkSuccessor(h)喚醒後繼節點。

下面介紹兩個重要的原始碼函數:tryRelease(int)unparkSuccessor(h)

3.2.1 tryRelease(int)

tryRelease(int)是釋放資源的方法,原始碼如下所示:

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

這部分是需要自定義同步器自己實現的,要注意的是返回值需要為boolean型別,表示釋放資源是否成功。

3.2.2 unparkSuccessor(h)

unparkSuccessor(h)是喚醒後繼節點的方法,原始碼如下所示:

private void unparkSuccessor(Node node) {
    //這裡,node一般為當前執行緒所在的結點。
    int ws = node.waitStatus;
    if (ws < 0)//置零當前執行緒所在的結點狀態,允許失敗。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一個需要喚醒的結點s
    if (s == null || s.waitStatus > 0) {//如果為空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) // 從後向前找。
            if (t.waitStatus <= 0)//從這裡可以看出,<=0的結點,都是還有效的結點。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//喚醒
}

這部分主要是查詢第一個還處於等待狀態的節點,將其喚醒;
查詢順序是從後往前找,這是因為CLH佇列中的prev鏈是強一致的,從後往前找更加安全,而next鏈因為addWaiter()方法和cancelAcquire()方法的存在,不是強一致的,因此從前往後找可能會出現問題。這部分的具體解釋可以參考參考文獻-1

3.3 acquireShared(int)和releaseShared(int)

3.3.1 acquireShared(int)

是使用共用模式獲取共用資源的頂層入口方法,原始碼如下所示:

public final void acquireShared(int arg) {
     if (tryAcquireShared(arg) < 0)
         doAcquireShared(arg);
}

流程如下:

  • 通過tryAcquireShared(arg)嘗試獲取資源,如果獲取成功則直接返回;
  • 如果獲取資源失敗,則呼叫doAcquireShared(arg)將執行緒阻塞在等待佇列中,直到被unpark()/interrupt()併成功獲取到資源才返回。

其中,tryAcquireShared(arg)是獲取共用資源的方法,也是需要使用者自己實現。

doAcquireShared(arg)是將執行緒阻塞在等待佇列中,直到獲取到資源後才返回,具體流程和acquireQueued()方法類似,
原始碼如下所示:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//加入佇列尾部
    boolean failed = true;//是否成功標誌
    try {
        boolean interrupted = false;//等待過程中是否被中斷過的標誌
        for (;;) {
            final Node p = node.predecessor();//前驅
            if (p == head) {//如果到head的下一個,因為head是拿到資源的執行緒,此時node被喚醒,很可能是head用完資源來喚醒自己的
                int r = tryAcquireShared(arg);//嘗試獲取資源
                if (r >= 0) {//成功
                    setHeadAndPropagate(node, r);//將head指向自己,還有剩餘資源可以再喚醒之後的執行緒
                    p.next = null; // help GC
                    if (interrupted)//如果等待過程中被打斷過,此時將中斷補上。
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }

            //判斷狀態,尋找安全點,進入waiting狀態,等著被unpark()或interrupt()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.3.2 releaseShared(int)

releaseShared(int)是釋放共用資源的頂層入口方法,原始碼如下所示:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//嘗試釋放資源
        doReleaseShared();//喚醒後繼結點
        return true;
    }
    return false;
}

流程如下:

  • 使用tryReleaseShared(arg)嘗試釋放資源,如果釋放成功則返回true,否則返回false;
  • 如果釋放成功,則呼叫doReleaseShared()喚醒後繼節點。

下面介紹一下doReleaseShared()方法,原始碼如下所示:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);//喚醒後繼
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head發生變化
            break;
    }
}

4. 面試問題模擬

Q:AQS是介面嗎?有哪些沒有實現的方法?看過相關原始碼嗎?

AQS定義了一個實現同步類的框架,實現方法主要有tryAquiretryRelease,表示獨佔模式的資源獲取和釋放,tryAquireSharedtryReleaseShared表示共用模式的資源獲取和釋放。
原始碼分析如上文所述。

參考資料

  1. Java並行之AQS詳解
  2. JUC鎖: 鎖核心類AQS詳解
  3. 從ReentrantLock的實現看AQS的原理及應用