Java

2023-08-29 06:01:31

Java - JUC核心類AbstractQueuedSynchronizer(AQS)底層實現


 

 一.  AQS內部結構介紹

JUC是Java中一個包   java.util.concurrent 。在這個包下,基本存放了Java中一些有關並行的類,包括並行工具,並行集合,鎖等。

AQS(抽象佇列同步器)是JUC下的一個基礎類,大多數的並行工具都是基於AQS實現的。

AQS本質並沒有實現太多的業務功能,只是對外提供了三點核心內容,來幫助實現其他的並行內容。

三點核心內容:

  • int state
    • 比如ReentrantLock或者ReentrantReadWriteLock, 它們獲取鎖的方式,都是對state變數做修改實現的。
    • 比如CountDownLatch基於state作為計數器,同樣的Semaphore也是用state記錄資源個數。
  • Node物件組成的雙向連結串列(AQS中)
    • 比如ReentrantLock,有一個執行緒沒有拿到鎖資源,當執行緒需要等待,則需要將執行緒封裝為Node物件,將Node新增到雙向連結串列,將執行緒掛起,等待即可。
  • Node物件組成的單向連結串列(AQS中的ConditionObject類中)
    • 比如ReentrantLock,一個執行緒持有鎖資源時,執行了await方法(類比synchronized鎖執行物件的wait方法),此時這個執行緒需要封裝為Node物件,並新增到單向連結串列。

二.  Lock鎖和AQS關係

ReentrantLock就是基於AQS實現的。ReentrantLock類中維護這個一個內部抽象類Sync,他繼承了AQS類。ReentrantLock的lock和unlock方法就是呼叫的Sync的方法。

AQS流程(簡述)
1. 當new了一個ReentrantLock時,AQS預設state值為0, head 和 tail 都為null;
2. A執行緒執行lock方法,獲取鎖資源。
3. A執行緒將state通過cas操作從0改為1,代表獲取鎖資源成功。
4. B執行緒要獲取鎖資源時,鎖資源被A執行緒持有。
5. B執行緒獲取鎖資源失敗,需要新增到雙向連結串列中排隊。
6. 掛起B執行緒,等待A執行緒釋放鎖資源,再喚醒掛起的B執行緒。
7. A執行緒釋放鎖資源,將state從1改為0,再喚醒head.next節點。
8. B執行緒就可以重新嘗試獲取鎖資源。
注: 修改AQS雙向連結串列時要保證一個私有屬性變化和兩個共有屬性變化,只需要讓tail變化保證原子性即可。不能先改tail(會破壞雙向連結串列)

三.  AQS - Lock鎖的tryAcquire方法

ReentrantLock中的lock方法實際是執行的Sync的lock方法。

Sync是一個抽象類,繼承了AQS

Sync有兩個子類實現:

  • FairSync: 公平鎖
  • NonFairSync: 非公平鎖

Sync的lock方法實現:

//  非公平鎖
final void lock() {
    //  CAS操作,嘗試將state從0改為1
    //  成功就拿到鎖資源, 失敗執行acquire方法
    if (compareAndSetState(0, 1))
     // 成功就設定互斥鎖的為當前執行緒擁有
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

//  公平鎖
final void lock() {
    acquire(1);
}

如果CAS操作沒有成功,需要執行acquire方法走後續

acquire方法是AQS提供的,公平和非公平都是走的這個方法

public final void acquire(int arg) {
    //  1. tryAcquire方法: 再次嘗試拿鎖
    //  2. addWaiter方法: 沒有獲取到鎖資源,去排隊
    //  3. acquireQueued方法:掛起執行緒和後續被喚醒繼續獲取鎖資源的邏輯
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
     // 如果這個過程中出現中斷,在整個過程結束後再自我中斷 
        selfInterrupt();
}

在AQS中tryAcquire是沒有具體實現邏輯的,AQS直接在tryAcquire方法中丟擲異常

在公平鎖和非公平鎖中有自己的實現。

  • 非公平鎖tryAcquire方法
//  非公平鎖
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

//  非公平鎖再次嘗試拿鎖 (注:該方法屬於Sync類中)
final boolean nonfairTryAcquire(int acquires) {
    //  獲取當前執行緒物件 
    final Thread current = Thread.currentThread();
    //  獲取state狀態
    int c = getState();
    //  state是不是沒有執行緒持有鎖資源,可以嘗試獲取鎖
    if (c == 0) {
        //  再次CAS操作嘗試修改state狀態從0改為1
        if (compareAndSetState(0, acquires)) {
            //  成功就設定互斥鎖的為當前執行緒擁有
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //  鎖資源是否被當前執行緒所持有 (可重入鎖)
    else if (current == getExclusiveOwnerThread()) {
        //  持有鎖資源為當前, 則對state + 1
        int nextc = c + acquires;
        //  健壯性判斷
        if (nextc < 0) // overflow
            //  超過最大鎖重入次數會拋異常(機率很小,理論上存在)
            throw new Error("Maximum lock count exceeded");
        //  設定state狀態,代表鎖重入成功
        setState(nextc);
        return true;
    }
    return false;
}
  • 公平鎖tryAcquire方法
//  公平鎖
protected final boolean tryAcquire(int acquires) {
    //  獲取當前執行緒物件 
    final Thread current = Thread.currentThread();
    //  獲取state狀態
    int c = getState();
    //  state是不是沒有執行緒持有鎖資源
    if (c == 0) {
        //  當前鎖資源沒有被其他執行緒持有
        //  hasQueuedPredecessors方法: 鎖資源沒有被持有,進入佇列排隊
        //  排隊規則: 
        //  1. 檢查佇列沒有執行緒排隊,搶鎖。 
        //  2. 檢查佇列有執行緒排隊,檢視當前執行緒是否排在第一位,如果是搶鎖,否則入佇列(注:該方法只是判斷,沒有真正入佇列)
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            //  再次CAS操作嘗試, 成功就設定互斥鎖的為當前執行緒擁有
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //  鎖資源是否被當前執行緒所持有 (可重入鎖)
    else if (current == getExclusiveOwnerThread()) {
        //  持有鎖資源為當前, 則對state + 1
        int nextc = c + acquires;
        //  健壯性判斷
        if (nextc < 0)
            //  超過最大鎖重入次數會拋異常(機率很小,理論上存在)
            throw new Error("Maximum lock count exceeded");
        //  設定state狀態,代表鎖重入成功
        setState(nextc);
        return true;
    }
    return false;
}

四.  AQS的addWaiter方法

 addWaiter方法,就是將當前執行緒封裝為Node物件,並且插入到AQS的雙向連結串列。

//  執行緒入佇列排隊
private Node addWaiter(Node mode) {
    //  將當前物件封裝為Node物件  
    //  Node.EXCLUSIVE 表示互斥  Node.SHARED 表示共用
    Node node = new Node(Thread.currentThread(), mode);
    // 獲取tail節點
    Node pred = tail;
    //  判斷雙向連結串列佇列有沒有初始化
    if (pred != null) {
        //  將當前執行緒封裝的Node節點prev屬性指向tail尾節點
        node.prev = pred;
        //  通過CAS操作設定當前執行緒封裝的Node節點為尾節點
        if (compareAndSetTail(pred, node)) {
            //  成功則將上一個尾節點的next屬性指向當前執行緒封裝的Node節點
            pred.next = node;
            return node;
        }
    }
    //  沒有初始化head 和 tail 都等於null
    //  enq方法: 插入雙向連結串列和初始化雙向連結串列
    enq(node);
    //  完成節點插入
    return node;
}

//  插入雙向連結串列和初始化雙向連結串列
private Node enq(final Node node) {
    //  死迴圈  
    for (;;) {
        //  獲取當前tail節點
        Node t = tail;
        //  判斷尾節點是否初始
        if (t == null) { // Must initialize
            //  通過CAS操作初始化初始化一個虛擬的Node節點,賦給head節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //  完成當前執行緒Node節點加入AQS雙向連結串列的過程
            //  當前執行緒封裝的Node的上一個prev屬性指向tail節點
            //  流程: 1. prev(私有)  --->  2. tail(共有)  ---> 3. next (共有)
            node.prev = t;
            //  通過CAS操作修改tail尾節點指向當前執行緒封裝的Node
            if (compareAndSetTail(t, node)) {
                //  將當前執行緒封裝的Node節點賦給上一個Node的下一個next屬性
                t.next = node;
                return t;
            }
        }
    }
}

五.  AQS的acquireQueued方法

acquireQueued方法主要就是執行緒掛起以及重新嘗試獲取鎖資源的地方

重新獲取鎖資源主要有兩種情況:

  • 上來就排在head.next,就回去嘗試拿鎖
  • 喚醒之後嘗試拿鎖
//  當前執行緒Node新增到AQS佇列後續操作
final boolean acquireQueued(final Node node, int arg) {
    //  標記,記錄拿鎖狀態   失敗
    boolean failed = true;
    try {
        // 中斷狀態 
        boolean interrupted = false;
        //  死迴圈
        for (;;) {
            //  獲取當前節點的上一個節點    prev
            final Node p = node.predecessor();
            //  判斷當前節點是否是head,是則代表當前節點排在第一位
            //  如果是第一位,執行tryAcquire方法嘗試拿鎖
            if (p == head && tryAcquire(arg)) {
                //  都成功,代表拿到鎖資源
                //  將當前執行緒Node設定為head節點,同時將Node的thread 和 prev屬性設定為null
                setHead(node);
                //  將上一個head的next屬性設定為null,等待GC回收
                p.next = null; // help GC
                //  拿鎖狀態  成功
                failed = false;
                //  返回中斷狀態
                return interrupted;
            }
            //  沒有獲取到鎖 --- 嘗試掛起執行緒
            //  shouldParkAfterFailedAcquire方法: 掛起執行緒前的準備
            //  parkAndCheckInterrupt方法: 掛起當前執行緒
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //  設定中斷執行緒狀態
                interrupted = true;
        }
    } finally {
        //  取消節點
        if (failed)
            cancelAcquire(node);
    }
}

//  檢查並更新無法獲取鎖節點的狀態
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //  獲取上一個節點的ws狀態
    /**
    * SIGNAL(-1)   表示當前節點釋放鎖的時候,需要喚醒下一個節點。或者說後繼節點在等待當前節點喚醒,後繼節點入隊時候,會將前驅節點更新給signal。
    * CANCELLED(1)  表示當前節點已取消排程。當timeout或者中斷情況下,會觸發變更為此狀態,進入該狀態後的節點不再變化。
    * CONDITION(-2)  當其他執行緒呼叫了condition的signal方法後,condition狀態的節點會從等待佇列轉移到同步佇列中,等待獲取同步鎖。
    * PROPAGATE(-3)   表示共用模式下,前驅節點不僅會喚醒其後繼節點,同時也可能喚醒後繼的後繼節點。
    * 預設(0) 新節點入隊時候的預設狀態。
    */
    int ws = pred.waitStatus;
    //  判斷上個節點ws狀態是否是 -1, 是則掛起     
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        /**  
        * 判斷上個節點是否是取消或者其他狀態。
        * 向前找到不是取消狀態的節點,修改ws狀態。
        * 注意:那些放棄的結點,由於被自己「加塞」到它們前邊,它們相當於形成一個無參照鏈,
        * 稍後就會被GC回收,這個操作實際是把佇列中的cancelled節點剔除掉。
        */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //  如果前驅節點正常,那就把上一個節點的狀態通過CAS的方式設定成-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

//  掛起當前執行緒
private final boolean parkAndCheckInterrupt() {
    //  掛起當前執行緒
    LockSupport.park(this);
    //  返回中斷標誌
    return Thread.interrupted();
}

六.  AQS的Lock鎖的release方法

//  互斥鎖模式   解鎖
public final boolean release(int arg) {
    //  嘗試是否可以解鎖
    if (tryRelease(arg)) {
        Node h = head;
        //  判斷雙連結串列是否存線上程排隊
        if (h != null && h.waitStatus != 0)
            //  喚醒後續執行緒
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//  嘗試是否可以解鎖
protected final boolean tryRelease(int releases) {
    //  鎖狀態 =  狀態 - 1 
    int c = getState() - releases;
    //  判斷鎖是是否是當前執行緒持有 
    if (Thread.currentThread() != getExclusiveOwnerThread())
        //  當前執行緒沒有持有丟擲異常
        throw new IllegalMonitorStateException();
    boolean free = false;
    //  當前鎖狀態變為0,則清空鎖歸屬執行緒
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    //  設定鎖狀態為0
    setState(c);
    return free;
}

//  喚醒執行緒
private void unparkSuccessor(Node node) {
    //  獲取頭節點的狀態
    int ws = node.waitStatus;
    if (ws < 0)
        //  通過CAS將頭節點的狀態設定為初始狀態
        compareAndSetWaitStatus(node, ws, 0);
    //  後繼節點
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        //  從尾節點開始往前遍歷,尋找離頭節點最近的等待狀態正常的節點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //  真正的喚醒操作
        LockSupport.unpark(s.thread);
}

 

以上僅供參考!!