AQS實現原理

2023-07-13 15:00:34

在java.util.concurrent包中,我們經常會使用ReentrantLock,CyclicBarrier等工具類,但是我們往往對其內部的實現原理卻並不知曉。

本篇文章主要對上述工具類的核心實現AQS進行剖析,分析原理可以讓我們學習到大神的程式碼設計思維。

文章將從一下幾個方面分析:

1.AQS是什麼?

AbstractQueuedSynchronizer類就是我們通常說的AQS,抽象的佇列同步器。

其實我們學過作業系統原理都知道,所謂的同步,指的是多執行緒場景下通過某種機制,保證某段程式碼執行是執行緒獨享的,

我們把這段程式碼叫同步塊,而把這種機制叫同步。

在JAVA中,傳統的方式是使用synchronized關鍵字來實現同步,那麼其底層是基於C++實現的ObjectMonitor。

今天我們討論的AQS是JDK1.5之後,提供的一個能實現同步功能的抽象類。

通過該類的註釋,我們可以瞭解到其內部採用了FIFO佇列的資料結構來實現,互斥場景的資源申請和釋放的實現如下所示:

   Acquire:
       while (!tryAcquire(arg)) {
          enqueue thread if it is not already queued;
          possibly block current thread;
       }
  
   Release:
       if (tryRelease(arg))
          unblock the first queued thread;

而tryAquire()和tryRelease()方法都是需要子類去實現的。

換句話說,如果要使用AQS,那麼只需要繼承,然後實現如下方法來自定義資源獲取和釋放的邏輯就行了。

FIFO佇列的節點使用內部類Node來描述:

    static final class Node {

        // 節點狀態
        volatile int waitStatus;

        // 上一個節點
        volatile Node prev;

        // 下一節點
        volatile Node next;

        // 需要排隊的執行緒
        volatile Thread thread;
    }

其他程式碼我先省略。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // 雙向連結串列頭節點
    private transient volatile Node head;

    // 雙向連結串列為節點
    private transient volatile Node tail;

    // 資源
    private volatile int state;
}

 

2.申請互斥資源的原始碼分析。

2.1 分析acquire()方法的邏輯

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcuire()方法可以理解成獲取資源,由子類實現,我們不關注。

我把程式碼寫成下面這樣,可能好理解一些:

public final void acquire(int arg) {
    if(tryAcquire(arg)) {
        return;
    }
    Node n = addWaiter(Node.EXCLUSIVE);
    if(acquireQueued(n, arg))
        selfInterrupt();
}

 

public final void acquire(int arg) {
    // 如果獲取資源成功,直接返回
    if(tryAcquire(arg)) {
        return true;
    }
    // 未獲取到資源,將自己封裝成一個node新增到佇列尾部
    Node n = addWaiter(Node.EXCLUSIVE);
    // 有點複雜,下面慢慢分析
    if(acquireQueued(n, arg)) {
        // 自我中斷 先不關注。
        selfInterrupt();
    }
}

起碼,我們應該知道一點,該方法一旦返回了,那麼就意味著可以進入同步塊程式碼執行了。

2.2 分析addWaiter()方法

private Node addWaiter(Node mode) {
    // 以互斥模式建立一個node,waitStatus是0
    Node node = new Node(Thread.currentThread(), mode);
    // 其實該段程式碼跟下面的enq方法差不多
    Node pred = tail;
    // 如果該佇列已經有為節點
    if (pred != null) {
        // 當前node上一個node指向為尾節點
        node.prev = pred;
        // cas修改尾節點
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 死迴圈保證新增成功
    enq(node);
    return node;
}


private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 如果當前佇列沒有節點,建立一個虛擬節點作為頭和尾,該節點的thread == null, waitStatus是0
            // 通過cas操作保證只有一個執行緒修改成功
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

其實上述方法就是保證了並行情況下node一定能正確加入到node中,而且如果是空連結串列,會增加一個虛擬的head節點

圖解一下:

(建立head虛擬節點)

 

 將新節點指向tail

 通過cas操作修改tail指向新節點

 如果修改成功,將修改前的tail的next指向新節點

 

2.3 分析aquireQueued方法

// 看這個方法的時候我建議不要關注其中的臨時變數
// 我們只要知道,這個方法裡面有個死迴圈,不管怎樣,只有return了,才能執行業務中定義的同步程式碼塊。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                // 唯一的返回點
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 我把程式碼改寫成下面這樣 可能會好理解一些
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 只有前繼節點是head的時候具有獲取資源的資格,如果獲取成功則直接將當前node設定成head
            if (p == head && tryAcquire(arg)) {
                // 可能在排隊後未阻塞前執行,也可能在阻塞被喚醒後執行
                setHead(node);
                p.next = null; // help GC
                failed = false;
                // 唯一的返回點
                return interrupted;
            }
            // 判斷是否該阻塞自己
            if (!shouldParkAfterFailedAcquire(p, node)){
               continue;
            }
            // 進行阻塞,並且在被喚醒之後返回執行緒的中斷狀態
            if(!parkAndCheckInterrupt()) {
                continue;
            }
            interrupted = true;
        }
    } finally {
        // 先別關注。
        if (failed)
            cancelAcquire(node);
    }
}

總之,一旦該方法返回了,就意味著執行緒獲取資源成功了。

下面圖解獲取資源成功後,做的修改:

 

 

 

 當然這裡的waitStatus不一定是0。

2.4 分析shouldParkAfterFailedAcquire()方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 如果上一節點狀態是-1,直接返回true
    if (ws == Node.SIGNAL)
        // 返回,並觸發外層阻塞執行緒
        return true;
    if (ws > 0) {
        // 如果上一個節點狀態大於0,其實就是1表示被取消了
        // 往前找waitStatus正常的節點
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // cas修改上一個節點為狀態為-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    // 返回進入下一個迴圈
    return false;
}

其實邏輯很簡單,就是修改自己的prev指向往前尋找的有效節點,並在阻塞前,將prev指向的有效節點waitStatus設定為-1。

問題來了?這個程式碼沒有並行問題嗎?

do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;

答案是沒有並行問題,因為每個節點都是往前尋找,理論上講每個執行緒遍歷的節點不一樣。

我們來圖解一下這個迴圈修改的過程:

假設執行緒A(waitStatus = 0),執行緒B(waitStatus = 1),執行緒C(waitStatus = 0 為當前執行緒)

 下圖示識了迴圈往前探測並修改參照關係

 下圖就是修改了pred節點的waitStatus = -1 (下一次迴圈到來的時候)

 

此時中間執行緒B的節點其實沒有參照再指向他了。

如果成功修改執行緒A的waitStatus為-1之後,下一次迴圈到來,該方法就返回了true,執行緒就阻塞了。

 其實看到這裡,我們可以將虛擬的head節點就當做是正在使用資源的執行緒表示(個人觀點哈)

 互斥資源申請的原始碼分析就結束了。

做一個小總結:

public final void acquire(int arg) {
    // 如果爭搶資源成功直接返回直接業務的同步程式碼塊
    if (tryAcquire(arg)) {
        return;
    }
    // 迴圈+cas保證向佇列中新增當前執行緒的node成功
    Node n = addWaiter(Node.EXCLUSIVE);
    // 死迴圈,爭搶資源,或者阻塞,或者喚醒之後繼續爭搶資源,直到搶資源成功後返回。
    if(acquireQueued(n, arg)) {
        selfInterrupt();
    }
    // 未被中斷的搶到資源
}

3.釋放互斥資源的原始碼分析。

我們直接分析release()方法:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 一旦釋放資源,此處就需要考慮並行問題了
        Node h = head;
        // head節點是虛擬節點,不可能是取消狀態所以這裡的判斷可以理解為
        // 頭結點不為空而且頭結點的waitStatus = -1執行unparkSuccessor方法
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        // h == null 或者 h.waitStatus = 0說明沒有後繼節點需要喚醒,
        // 如果此時正好head後面一個node正在試圖修改head的狀態改成-1是能改成功的
        // 但是由於之前分析的acquireQueued方法是一個死迴圈,哪怕head被修改成-1,
        // 但是由於該回圈會先搶鎖所以也就不存線上程改了狀態會park的問題。
        // h == null說明並無執行緒參與競爭
        return true;
    }
    return false;
}

 

其實關鍵就是unparkSuccessor()方法

private void unparkSuccessor(Node node) {
    // 進入該方法說明資源已經被釋放了。
    int ws = node.waitStatus;
    // 如果ws小於0,修改為0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 傳入node就是head
    // s指向head下一節點
    Node s = node.next;
    // 如果沒有後繼節點說明有執行緒已經搶到資源
    // 如果後繼節點被取消了
    if (s == null || s.waitStatus > 0) {
        // 假設沒有節點需要喚醒
        s = null;
        // 從後往前找,找到距離head最近的節點喚醒
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

其實執行該方法的時候是存在並行的情況的。

我理解這個地方從後往前找是為啥呢?能不能從前往後找?

 

這個問題,我也不知道。。。。