JUC鎖:核心類AQS原始碼詳解

2022-09-04 15:01:08

1 疑點todo和解疑

同步狀態變數:state就是那個共用資源(private volatile int state;) Lock類繼承AQS類並定義lock()、unLock()的方法,表示獲取鎖和釋放鎖。多執行緒並行存取同一個lock範例,lock()方法會cas修改state變數,修改成功的執行緒獲得鎖,其他執行緒進入AQS佇列等待。

沒有必要!sync佇列是雙向連結串列結構,出隊時,head交替方式,只需要修改head和head後繼2個節點參照關係;固定head,就要修改head,head後繼,以及head後繼的後繼 共3個節點。顯然前者效率更高

不存在的,因為經過判斷得出此時node就是head的後繼。並且必須由這個取消節點node來喚醒後繼,要不node執行緒結束後,就沒有執行緒能夠喚醒佇列裡的其他節點了。

先說結果:由搶到鎖的那個執行緒來喚醒!
上述的場景是存在的,例如在非公平鎖模式中,B執行緒被A執行緒喚醒,A結束,B成為head,B去執行tryAcquire(),但此時C執行緒搶佔到鎖,B執行tryAcquire()沒有拿到鎖,再次park阻塞。C執行緒執行結束後將A喚醒

只有將前置節點狀態改為SIGNAL,才能確保當前節點可以被前置unPark喚醒。也就是說阻塞自己前先保證一定能夠被喚醒。因為程式碼中:
獨佔模式下,喚醒後繼前先限制:h.waitStatus != 0
共用模式下,喚醒後繼前先限制:h.waitStatus=SIGNAL

表示本執行緒在獲取資源期間,如果被其他執行緒中斷,本執行緒不會因為中斷而取消獲取資源,只是將中斷標記傳遞下去。

 When acquired in exclusive mode,
 * attempted acquires by other threads cannot succeed. Shared mode
 * acquires by multiple threads may (but need not) succeed. This class
 * does not "understand" these differences except in the
 * mechanical sense that when a shared mode acquire succeeds, the next
 * waiting thread (if one exists) must also determine whether it can
 * acquire as well. Threads waiting in the different modes share the
 * same FIFO queue.
  1. 共用模式:允許多個執行緒同時獲取資源;當一個節點的執行緒獲取共用資源後,需要要通知後繼共用節點的執行緒,也可以獲取了。共用節點具有傳播性,傳播性的目的也是儘快通知其他等待的執行緒儘快獲取鎖。
  2. 獨佔模式: 只能夠一個執行緒佔有資源,其它嘗試獲取資源的執行緒將會進入到佇列等待。
  3. 響應中斷並終止執行緒只要被中斷就不會獲取資源:兩種情況的中斷:1、剛嘗試獲取、2、進入佇列中等待,前者立即停止獲取,後者執行取消邏輯,等待節點變為取消狀態

A、B先後進入佇列,狀態都是0。A獲得資源,進入setHeadAndPropagate晉升為head,A進入doReleaseShared嘗試喚醒B時,但B還沒將A改為signal,因為A還是0,A將狀態改為PROPAGATE

2 AbstractQueuedSynchronizer學習總結

2.1 AQS要點總結

對於AbstractQueuedSynchronizer的分析,最核心的就是sync queue的分析。

  1. 每一個節點都是由前一個節點喚醒
  2. 當節點發現前驅節點是head並且嘗試獲取成功,則會輪到該執行緒執行。
  3. condition queue中的節點向sync queue中轉移是通過signal操作完成的。
  4. SIGNAL,表示後面的節點需要執行。
  5. PROPAGATE:就是為了避免執行緒無法會喚醒的窘境。因為共用鎖會有很多執行緒獲取到鎖或者釋放鎖,所以有些方法是並行執行的,就會產生很多中間狀態,而PROPAGATE就是為了讓這些中間狀態不影響程式的正常執行。

2.2 細節分析

2.2.1 插入節點時先更新prev再更新前驅next

//addWaiter():
node.prev = pred; // 1 更新node節點的prev域
if (compareAndSetTail(pred, node)) {
    pred.next = node; //2 更新node前驅的next域
    return node;
}
//enq():
node.prev = t; // 1 更新node節點的prev域
if (compareAndSetTail(t, node)) {
    t.next = node;//2 更新node前驅的next域
    return t;
}
//unparkSuccessor():
Node s = node.next; //通過.next來直接獲取到節點的後繼節點,這個節點的後繼的prev一定指向節點本身
      //....
        if (s != null)
            LockSupport.unpark(s.thread);
  1. addWaiter() 或者enq()插入節點時,都是先更新節點的prev域,再更新它前驅的next域。那麼通過node.next()取到的後繼,後繼的prev域一定是指向node本身。如果先更新next域,在更新prev域時出現異常,那麼通過.next取到不是完整的節點
  2. unparkSuccessor()喚醒後繼時,Node s = node.next; 通過.next來獲取node的後繼,後繼的prev一定指向node本身

2.2.2 為什麼unparkSuccessor()要從尾部往前遍歷

因為取消節點的next域指向了自身,所以不能從通過next來遍歷,但prev是完整的,所以通過prev來遍歷。

2.2.3 AQS的設計,儘快喚醒其他等待執行緒體現在3個地方

  1. 共用鎖的傳播性。
  2. doReleaseShared()中head改變,會迴圈喚醒head的後繼節點。
  3. 執行緒獲取鎖失敗後入佇列並不會立刻阻塞,而是判斷是否應該阻塞shouldParkAfterFailedAcquire,如果前繼是head,會再給一次機會獲取鎖。

3 AQS 簡介

AQS是一個用來構建鎖和同步器的框架。理論參考:JUC同步器框架

三個基本元件相互共同作業:

  1. 同步狀態的原子性管理;
  2. 執行緒的阻塞與喚醒;
  3. 佇列的管理;

同步器一般包含兩種方法,一種是acquire,另一種是release。acquire操作阻塞呼叫的執行緒,直到或除非同步狀態允許其繼續執行。而release操作則是通過某種方式改變同步狀態,使得一或多個被acquire阻塞的執行緒繼續執行。

3.1 AQS核心思想

  1. 如果請求的共用資源空閒,則將當前請求執行緒設定為有效工作執行緒,並且將共用資源設定為鎖狀態
  2. 設計一套機制:【執行緒如何阻塞等待以及被喚醒時鎖如何分配】?這個機制AQS是用CLH佇列鎖實現的
  3. CLH佇列鎖:一個虛擬的雙向佇列,AQS是將每條請求共用資源的執行緒封裝成一個CLH鎖佇列的一個節點(Node)來實現鎖的分配。【嚴格的FIFO佇列,框架不支援基於優先順序的同步
  4. 使用一個int成員變數來表示同步狀態,使用volatile修飾保證執行緒可見性,並使用CAS思想進行值維護。

3.2 對資源的共用方式

兩種方式:

  1. Exclusive(獨佔):只有一個執行緒能執行。又可分為公平鎖和非公平鎖:
    1. 公平鎖:按照執行緒在佇列中的排隊順序,先到者先拿到鎖
    2. 非公平鎖:當執行緒要獲取鎖時,無視佇列順序直接去搶鎖,誰搶到就是誰的
  2. Share(共用):多個執行緒可同時執行

3.3 AQS資料結構

分析類,首先就要分析底層採用了何種資料結構,抓住核心點進行分析:

  1. Sync queue,即同步佇列,是雙向連結串列,包括head節點和tail節點,head節點主要用作後續的排程
  2. Condition queue不是必須的,其是一個單向連結串列,只有當使用Condition時,才會存在此單向連結串列。並且可能會有多個Condition queue

4 AbstractQueuedSynchronizer原始碼分析

4.1 類的繼承關係

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable

繼承自抽象類:AbstractOwnableSynchronizer,父類別提供獨佔執行緒的設定與獲取的方法

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private static final long serialVersionUID = 3737899427754241961L;
    protected AbstractOwnableSynchronizer() { }//   建構函式
    private transient Thread exclusiveOwnerThread; //獨佔模式下的執行緒
    // 設定獨佔執行緒 
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    // 獲取獨佔執行緒 
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

4.1.1 AQS需要子類重寫的方法

    protected boolean tryAcquire(int arg) {//獨佔方式獲取鎖
        throw new UnsupportedOperationException();
    }
    protected boolean tryRelease(int arg) { //釋放獨佔的鎖
        throw new UnsupportedOperationException();
    }
    protected int tryAcquireShared(int arg) { //以共用方式獲取鎖
        throw new UnsupportedOperationException();
    }
    protected boolean tryReleaseShared(int arg) {//釋放共用鎖
        throw new UnsupportedOperationException();
    }
    protected boolean isHeldExclusively() {//是否獨佔資源
        throw new UnsupportedOperationException();
    }

關於重寫說明
目的是將共用資源state的讀寫交給子類管理,AQS專注在佇列的維護以及執行緒的阻塞與喚醒

4.2 類的常數/成員變數

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
    implements java.io.Serializable {    
    private static final long serialVersionUID = 7373984972572414691L;    
    // 頭節點
    private transient volatile Node head;    
    // 尾節點
    private transient volatile Node tail;    
    //0:表示沒有執行緒獲取到鎖;1表示有執行緒獲取到鎖;大於1:表示有執行緒獲得了鎖,且允許重入
    private volatile int state;    
    // 自旋時間
    static final long spinForTimeoutThreshold = 1000L;
    
    // 以下跟cas有關
    private static final Unsafe unsafe = Unsafe.getUnsafe();  // Unsafe類範例
    private static final long stateOffset; // state記憶體偏移地址
    private static final long headOffset; // head記憶體偏移地址
    private static final long tailOffset; // state記憶體偏移地址
    private static final long waitStatusOffset;// tail記憶體偏移地址
    private static final long nextOffset; // next記憶體偏移地址
    // 靜態初始化塊
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));
        } catch (Exception ex) { throw new Error(ex); }
    }
}

說明:

  1. 屬性中包含了頭節點head,尾節點tail,狀態state、自旋時間spinForTimeoutThreshold
  2. AbstractQueuedSynchronizer抽象的屬性在記憶體中的偏移地址,通過該偏移地址,可以獲取和設定該屬性的值
  3. 同時還包括一個靜態初始化塊,用於載入記憶體偏移地址。

4.3 靜態內部類Node

執行緒封裝成Node並具備狀態

static final class Node
{
	// 模式,分為共用與獨佔
	static final Node SHARED = new Node();// 共用模式
	static final Node EXCLUSIVE = null; // 獨佔模式
	// 節點狀態
	static final int CANCELLED =  1;//表示當前的執行緒被取消
	static final int SIGNAL    = -1;//表示當前節點的後繼節點包含的執行緒需要被執行【被unpark】,
	static final int CONDITION = -2;//表示當前節點在等待condition,也就是在condition佇列中
	static final int PROPAGATE = -3;//表示當前場景下後續的acquireShared能夠得以執行
	volatile int waitStatus;//節點狀態;表示當前節點在sync佇列中,等待著獲取鎖

	volatile Node prev; // 指向當前節點的前驅
	volatile Node next;// 指向當前節點的後繼
	volatile Thread thread;//節點所對應的執行緒
	Node nextWaiter;// 下一個等待者    只跟condition有關
    private transient volatile Node head; // 頭節點  懶載入
    private transient volatile Node tail; //尾節點  懶載入
    private volatile int state;  // 同步狀態

	// 節點是否在共用模式下等待
	final boolean isShared() {
	    return nextWaiter == SHARED;
	}
	// 獲取前驅節點,若前驅節點為空,丟擲異常
	final Node predecessor() throws NullPointerException {
	    Node p = prev;// 儲存前驅節點
	    if (p == null)
	        throw new NullPointerException();
	    else
	        return p;
	}
	// 無參建構函式
	Node() { // Used to establish initial head or SHARED marker
	}
	// 建構函式
	Node(Thread thread, Node mode) {	// Used by addWaiter
	    this.nextWaiter = mode;
	    this.thread = thread;
	}

	Node(Thread thread, int waitStatus) { // Used by Condition
	    this.waitStatus = waitStatus;
	    this.thread = thread;
	}
}

關於Node說明

每個被阻塞的執行緒都會被封裝成一個Node節點,放入佇列。Node包含了一個Thread型別的參照,並且有自己的狀態:

  • CANCELLED:1,表示當前的執行緒被取消。
  • SIGNAL:-1,表示負責unPark後繼【由前一個節點unPark下一個節點】。
  • CONDITION:-2,表示當前節點在等待condition,也就是在condition queue中。
  • PROPAGATE:-3,表示當前場景下後續的acquireShared能夠得以執行。
  • 預設值:0,發生在:1、節點加入到佇列成為tail節點,2、節點成為head,並準備喚醒後繼

4.4 建構函式

protected AbstractQueuedSynchronizer() { }    //預設的無參構造

4.5 核心方法分析

4.5.1 核心方法概覽

public final void acquireShared(int arg) {...} // 獲取共用資源的入口(忽略中斷)
protected int tryAcquireShared(int arg); // 嘗試獲取共用資源
private void doAcquireShared(int arg) {...} // AQS中獲取共用資源流程整合
private Node addWaiter(Node mode){...} // 將node加入到同步佇列的尾部
protected int tryAcquireShared(int arg); // 嘗試獲取共用資源
private void setHeadAndPropagate(Node node, int propagate) {...} // 設定 同步佇列的head節點,以及觸發"傳播"操作
private void doReleaseShared() {...} // 遍歷同步佇列,調整節點狀態,喚醒待申請節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {...} // 如果獲取資源失敗,則整理隊中節點狀態,並判斷是否要將執行緒掛起
private final boolean parkAndCheckInterrupt() {...} // 將執行緒掛起,並在掛起被喚醒後檢查是否要中斷執行緒(返回是否中斷)
private void cancelAcquire(Node node) {...} // 取消當前節點獲取資源,將其從同步佇列中移除

4.5.2 acquire()方法

該函數以獨佔模式獲取(資源),忽略中斷
流程如下:

原始碼如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt(); //來到這裡,表示執行緒拿到鎖,並且讀取到執行緒的中斷標識為true,呼叫selfInterrupt()來恢復執行緒的interrupted中斷標誌(被parkAndCheckInterrupt()擦除了,所以再設定一次)。
}

static void selfInterrupt() {
    Thread.currentThread().interrupt();//執行緒設定interrupted中斷標誌
}
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

acquire()總結

  1. 先呼叫tryAcquire(),由子類實現來嘗試加鎖,如果獲取到鎖,則執行緒繼續執行;反則,節點加入佇列
  2. 呼叫addWaiter(),將呼叫執行緒封裝成為一個節點並放入AQS佇列。
  3. 呼叫acquireQueued(),先park阻塞等待,直到被unPark喚醒。
  4. 如果執行緒被設定中斷,那麼acquire結束前,需要重新設定中斷。

4.5.3 addWaiter()方法

addWaiter:快速新增的方式往sync queue尾部新增節點

// 新增等待者
    private Node addWaiter(Node mode) {
        // 新生成一個節點
        Node node = new Node(Thread.currentThread(), mode);
        // 建立臨時參照pred,跟tail指向相同地址
        Node pred = tail;
        if (pred != null) { // 尾節點不為空,即佇列已經初始化過
            // 將node的prev域連線到尾節點
            node.prev = pred; 
            if (compareAndSetTail(pred, node)) { // cas更新tail,指向新建立的node
                // 設定尾節點的next域為node
                pred.next = node;  // 結合 node.prev = pred;  形成雙向連結串列
                return node; // 返回新生成的節點
            }
        }
        enq(node); // 佇列還未初始化,或者是compareAndSetTail操作失敗,則進入enq
        return node;
    }
    //關於並行情景說明:
    // 從 Node pred = tail;  到 compareAndSetTail(pred, node); 期間,佇列可能插入了新的節點,pred指向的不是最新的tail,那麼compareAndSetTail(pred, node) 就會執行失敗,同時 node.prev = pred; node的前驅也不是最新的tail。
    // 通過enq()來解決並行問題,enq()通過自旋+cas來保證執行緒安全

addWaiter()說明

  1. 使用快速新增的方式(失敗不重試)建立新節點並新增到往佇列尾部,更新tail
  2. 如果佇列還沒有初始化或者cas失敗,則呼叫enq()插入佇列

4.5.4 enq()方法

    // 執行緒安全地建立佇列、或者將節點插入佇列、
    private Node enq(final Node node) {
        for (;;) { // 自旋+cas,確保節點能夠成功入佇列
            Node t = tail;//尾節點
            if (t == null) { // 尾節點為空,即還沒被初始化
                if (compareAndSetHead(new Node())) // 設定head。 !!!!注意,這裡是new node,沒有使用引數的node,因此head節點不參照任何執行緒
                    tail = head; // 頭節點與尾節點都指向同一個新生節點。迴圈繼續,進入else後,引數node將插入到佇列
            } else { // 尾節點不為空,即已經被初始化過
                node.prev = t;  // 將node節點的prev域連線到尾節點
                if (compareAndSetTail(t, node)) { // 比較更新tail,node成為新的tail
                    // 設定尾節點的next域為node
                    t.next = node;   // 結合 node.prev = t;   形成雙向連結串列
                    return t; // 返回Node的前驅節點
                }
            }
        }
    }
    
    //CAS head field. Used only by enq.
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    //CAS head field. Used only by enq.
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

enq()方法總結:

  1. 功能:cas+自旋方式將節點插入佇列
  2. 如果佇列未初始化,先建立頭節點head(head不指向任務執行緒),再將節點插入到佇列(當第一個節點被建立後,佇列實際有兩個節點:head+業務節點)。
  3. 如果佇列已經初始化,則直接插入佇列

4.5.5 acquireQueue()方法

作用:sync佇列中的節點在獨佔且忽略中斷的模式下獲取(資源)

原始碼如下:

// sync佇列中的節點在獨佔且忽略中斷的模式下獲取(資源):
    final boolean acquireQueued(final Node node, int arg) {
        // 標誌
        boolean failed = true;
        try {
            // 中斷標識。如果執行緒喚醒後,中斷標識是true,外層的acquire()將進入selfInterrupt()。
            boolean interrupted = false;
            // 無限迴圈 :如果前驅不是head,那執行緒將park阻塞,等待前面的節點依次執行,直到被unPark喚醒
            for (;;) {
                // 獲取node的前驅,如果前驅是head,則表明前面已經沒有執行緒等待了,該執行緒可能成為工作執行緒
                final Node p = node.predecessor(); 
                // 前驅為頭節點並且成功獲得鎖
                if (p == head && tryAcquire(arg)) {
                    setHead(node); // node晉升為head
                    p.next = null; // 舊head的next域指向null,將會被GC,移出佇列
                   
                   failed = false; // 設定標誌
                    return interrupted; //拿到鎖,break迴圈,並返回中斷標識
                }
                //執行到這裡,前驅非head 或者 前驅是head但獲取鎖失敗,那麼:1、將前驅狀態改為signal 2、當前執行緒unPark阻塞
                //shouldParkAfterFailedAcquire():尋找非取消狀態的前驅,如果狀態為signal返回true 反則,將前驅狀態改為signal、再返回false
                //前驅是signal ,執行parkAndCheckInterrupt()後,當前執行緒park阻塞。一直到執行緒被unPark喚醒,再返回執行緒的中斷狀態
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//parkAndCheckInterrupt返回true表明執行緒中斷狀態為true
                   //上面if同時成立,才會執行。
                   interrupted = true;  //那麼把中斷標識置為true
            }
        } finally { //(有異常,在丟擲之前執行finally;沒有異常,在return之前執行finally)
            if (failed)//只有try的程式碼塊出現異常,failed才會是true。什麼情景會產生異常?cancelAcquire分析時有說明
                cancelAcquire(node); //執行取消邏輯
        }
    }
    
    final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
      private void setHead(Node node) {
        head = node;
        node.thread = null;//再次表明head的thread屬性是空的
        node.prev = null;
    }

acquireQueue()總結

  1. 功能:節點進入AQS佇列後,先park阻塞等待,直到被unPark喚醒,或者中斷喚醒
  2. 找到非取消狀態的前驅(取消狀態的將會被移出佇列並GC),如果前驅是SIGNAL,那麼當前節點進入park阻塞,否則,先將前驅改為SIGNAL,再進入park阻塞。
  3. 被unPark喚醒後,判斷前驅是頭節點且獲取到資源(tryAcquire成功),當前節點晉升為頭節點。自此,執行緒獲取到鎖
  4. 呼叫shouldParkAfterFailedAcquire和parkAndCheckInterrupt函數,表明只有當該節點的前驅節點的狀態為SIGNAL時,才可以對該節點所封裝的執行緒進行park操作。

4.5.6 shouldParkAfterFailedAcquire()方法

// 當獲取(資源)失敗後:1、判斷能否將當前執行緒park;2、修改前驅節點狀態為signal
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 獲取前驅節點的狀態
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 狀態為SIGNAL
            // 只有當前驅節點為 signal時,才返回true ,表示當前執行緒可以安全地park阻塞;其它情況返回false
            return true; 
            //跳過那些CANCELLED狀態的前驅
        if (ws > 0) { // 表示狀態為CANCELLED,為1 
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0); // 找到pred節點前面最近的一個狀態不為CANCELLED的節點;然後跳出迴圈並返回false
            pred.next = node; 
        } else { // 為PROPAGATE -3 或者是0 ,(為CONDITION -2時,表示此節點在condition queue中) 
             // cas更新前驅的狀態為SIGNAL.如果前驅是頭節點,那麼頭節點ws=SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 
        }
        // 不能進行park操作
        return false;
    }
    //CAS waitStatus field of a node.
    private static final boolean compareAndSetWaitStatus(Node node,int expect,  int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
    }

shouldParkAfterFailedAcquire()總結:

  1. 如果前驅狀態是:SIGNAL,返回true。表示當前節點可以安全地unPark()阻塞
  2. 遇到取消的前驅節點,則跳過。這些被取消的節點會從佇列中移除並GC
  3. 如果前驅狀態不是:SIGNAL,將前驅狀態改為:SIGNAL,返回false,回到1 繼續

4.5.7 parkAndCheckInterrupt()方法

// 進行park操作並且返回該執行緒的中斷標識
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); //外面的for迴圈可能會導致多次park,不過沒關係,park允許多次執行
       //被喚醒之後,返回中斷標記,即如果是正常喚醒則返回false,如果是由於中斷醒來,就返回true
        return Thread.interrupted(); // acquireQueued() 中宣告的interrupted 將會被更新為這裡的返回結果
    }
    public static boolean interrupted() {
        return currentThread().isInterrupted(true);//返回當前執行緒interrupted中斷標記,同時會清除此interrupted標記
    }

方法總結:

  1. 執行park操作(前提:前驅狀態是SIGNAL),在佇列中阻塞等待。
  2. 被unPark()喚醒後,返回執行緒的interrupted中斷標識,並且清除interrupted標記

4.5.8 cancelAcquire()方法

什麼時候才會執行cancelAcquire?

在lockInterruptibly()會通過丟擲中斷異常來執行cancelAcquire方法,lock方法過程中則不會執行該程式碼,作者這麼些的意圖在於for迴圈內部如果出現不可控的因素導致產生未知的異常,則會執行cancelAcquire,很明顯這屬於一種相對偏保守的保險程式碼。
// 取消獲取鎖
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null) // node為空,返回
            return;
        node.thread = null;// thread置空 備註1
        // Skip cancelled predecessors
        Node pred = node.prev;// pred表示:最靠近node並且狀態不等於取消的前驅節點
        while (pred.waitStatus > 0) 
            node.prev = pred = pred.prev; //更新pred,往列頭推進 
            
        Node predNext = pred.next; //predNext表示:pred的後繼
        // 設定node節點的狀態為CANCELLED
        node.waitStatus = Node.CANCELLED; //備註2
        if (node == tail && compareAndSetTail(node, pred)) { // 若node節點為尾節點,則pred成為尾節點  備註3
            // pred的next域置為null
            compareAndSetNext(pred, predNext, null);
        } else { // 2、node節點不為尾節點,或者比較設定不成功
            int ws;
            //下面一串判斷,最終目標:在node移除佇列前,將有效的前驅節點狀態改為signal
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) { 
                // pred節點不為頭節點,並且
                    //pred節點的狀態為SIGNAL)或者 
                      // pred節點狀態小於等於0,並且比較並設定等待狀態為SIGNAL成功,並且pred節點所封裝的執行緒不為空
                Node next = node.next;
                if (next != null && next.waitStatus <= 0) // 後繼不為空並且後繼的狀態小於等於0
                    compareAndSetNext(pred, predNext, next); // 比較並設定pred.next = next;  到這裡:node的前驅節點指向node的後繼節點。 備註4
            } else {
            // 這裡,pred==head (3、即node是head的後繼)或者pred.status=0,-2時 【前面while (pred.waitStatus > 0) 已經限制了pred一定是<=0】,執行:
                unparkSuccessor(node); // 喚醒node的後繼
            }

            node.next = node; // help GC  後繼節點指向自身  備註5
        }
    }
    //修改引數node的next域
     private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

對cancelAcquire()總結之前,先明確以下兩點:

  1. 基於對acquire()方法的分析,呼叫鏈:addWait()->enq()->acquireQueue()->cancelAcquire(node),進入到cancelAcquire()時,節點node一定已經在佇列中,而且它不會是head,並且沒有持有鎖
  2. AQS通過管理這些屬性:waitStatus、thread、prev、next、head、tail、nextWaiter ,成為一個虛擬的列隊。

cancelAcquire(node)總結

cancelAcquire()負責將node移出佇列,並保持佇列中其他節點的順序關係不變,它做了以下工作:

  • waitStatus更新為cancel (備註2)
  • thread更新為null(備註1)
  • tail:如果node是尾節點,更新tail參照 (備註3)
  • head:不需要更新(node不會是head)
  • prev:沒有更新
  • next: node前置的next域更新指向node後繼,並且node的next指向了自身 (備註4、備註5)
  • nextWaiter:不需要更新(跟condition有關,這裡不涉及)

執行cancelAcquire後,佇列變成這樣的:

發現:

  1. node沒有移出佇列,因為被後繼的prev所參照。
  2. node.next變了,指向了自身,這就能解釋為什麼unparkSuccessor()是從後往前遍歷:因為取消節點的next域指向了自身,所以不能從通過next來遍歷,但prev是完整的,所以通過prev來遍歷。
  3. 取消節點,暫存在佇列中,當後繼節點被喚醒,執行shouldParkAfterFailedAcquire後,取消節點的參照鏈清空,移出佇列,最後GC回收

4.5.9 unparkSuccessor()方法

    // 喚醒node節點的後繼
    private void unparkSuccessor(Node node) {
       
        // 獲取node節點的等待狀態
        int ws = node.waitStatus;
        if (ws < 0) // 狀態值小於0,為SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
            // cas節點狀態為0
            compareAndSetWaitStatus(node, ws, 0);//如果head沒有後繼的情況下,狀態會一直=0
        
        Node s = node.next;
        //若後繼為空,或後繼已取消,則從尾部往前遍歷 找到最靠近的一個處於正常阻塞狀態的節點進行喚醒
        // 什麼時候s==null ?   node的後繼節點是取消狀態時,node.next為null
        if (s == null || s.waitStatus > 0) { 
            s = null; 
            // 由尾節點向前倒著遍歷佇列,但不會超過node節點
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0) 
                    s = t; 
        }
        if (s != null)
            LockSupport.unpark(s.thread);//喚醒s節點執行緒
    }

unparkSuccessor()總結

  1. 作用:找到有效的後繼節點unPark喚醒
  2. 尋找有效後繼時從尾往前倒著遍歷:因為取消節點的next域指向了自身,所以不能從通過next來遍歷
  3. 將發起unPark喚醒的節點(只能是head)狀態改為0(意味著在head喚醒後繼,到被後繼推出佇列的期間,狀態變為0)

4.5.10 release()方法

以獨佔模式釋放物件,其原始碼如下:

public final boolean release(int arg) {
        if (tryRelease(arg)) { //如果釋放鎖成功
            Node h = head; 
            // 執行緒A呼叫acquire()獲取到鎖之後,A執行緒節點變為head,然後A呼叫release 釋放鎖,存在兩種情況:
            // 1、 如果有新的執行緒B入隊,B成為後繼節點,B會將A狀態改為SIGNAL,那麼(h != null && h.waitStatus != 0 )成立,unparkSuccessor()喚醒後繼節點
            // 2、如果A後面沒有節點,A狀態是預設值:0 ,那麼h.waitStatus != 0 不成立,直接返回true,不需要喚醒後繼節點。
            if (h != null && h.waitStatus != 0) // 頭節點不為空並且頭節點狀態不為0
                unparkSuccessor(h); //由head喚醒後繼節點
            return true;
        }
        return false;
    }

release()總結:

  1. 功能:釋放獨佔鎖
  2. 先呼叫tryRelease()由子類實現釋放鎖
  3. 如果釋放鎖成功,然後unPark喚醒後繼節點(沒有後繼就不需要喚醒)

4.5.11 acquireSharedInterruptibly()方法

   //獲取共用資源,響應中斷
   public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) //讀取執行緒中斷標記,然後擦除標記
            throw new InterruptedException(); //中斷標記為true,丟擲中斷異常,停止執行
        if (tryAcquireShared(arg) < 0)  //呼叫子類實現方法 獲取資源
            doAcquireSharedInterruptibly(arg); //沒有獲取到,那麼再嘗試獲取(進入佇列排隊等待)
    }

獲取共用資源流程圖:

acquireSharedInterruptibly()總結

  1. 共用模式獲取物件,響應中斷並終止獲取
  2. 先呼叫子類實現獲取資源,沒有獲取到再加隊佇列等待。

4.5.12 doAcquireSharedInterruptibly()方法

//獲取共用資源,響應中斷
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); //增加等待節點
        boolean failed = true;
        try {
            for (;;) {//無限迴圈,直到r>0
                final Node p = node.predecessor(); // p表示 剛插入節點的前驅
               //1、如果前驅是head
               if (p == head) {
                    int r = tryAcquireShared(arg);//呼叫子類實現方法 嘗試獲取共用資源
                    if (r >= 0) { // >0 表示 獲取到資源
                    // 1、如果是ReentrantReadWriteLock、CountDownLatch ,有可能r=1
                    // 2、如果是Semaphore,有可能r=0
                    // 1、2 都呼叫setHeadAndPropagate進行共用傳播判斷
                        setHeadAndPropagate(node, r);// 更新head並進行共用傳播
                        p.next = null; // 將佇列頭節點的next域置空,之後,這個節點將被GC回收
                        failed = false;
                        return;
                    }
                }
                // 2、前驅不是head
                //執行緒park阻塞,直至被unPark喚醒,或者被其它執行緒中斷喚醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException(); //進入這裡表示執行緒中斷標記為true,那麼丟擲中斷異常
            }
        } finally {
            if (failed) //當try 程式碼塊有異常:中斷異常 或 其他未知異常,failed才是true
                cancelAcquire(node);//取消獲取資源
        }
    }

doAcquireSharedInterruptibly()總結

  1. 建立節點並插入aqs佇列,將前驅狀態改為signal,park阻塞,等待unPark喚醒。
  2. 正常喚醒後,無限迴圈直到前驅是head並且呼叫子類方法獲取共用資源成功,呼叫setHeadAndPropagate()成為head並進行共用傳播
  3. 被中斷喚醒、或者回圈等待過程發生中斷異常,執行cancelAcquire()取消獲取資源

4.5.13 setHeadAndPropagate()方法

setHeadAndPropagate在獲取共用資源的時候被呼叫

// 設定 同步佇列的head節點,以及觸發"傳播"操作
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 記錄更新前的head
        setHead(node); //引數node 成為新的head
         //判斷:
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next; //獲取node後繼
            //後繼為空或者後繼是等待共用資源的節點
            if (s == null || s.isShared()) 
                doReleaseShared(); //釋放共用資源
        }
    }

滿足呼叫doReleaseShared的條件分析

  1. propagate > 0
    ReentrantReadWriteLock、CountDownLatch 呼叫tryAcquireShared()返回1進入,滿足條件;Semaphore 進入,propagate可能等於0,不滿足,繼續2

  2. h == null
    h == null 表示舊head變為null,程式沒有地方設定head=null,並且這裡h參照著head意味著head不會被GC。 因此,h == null不滿足條件,繼續3 【不知道哪種情況下h==null todo】

  3. h.waitStatus < 0

  • h.waitStatus==1:取消,不能由取消節點喚醒後繼,不滿足條件

setHeadAndPropagate()總結:

  1. 方法功能:設定 同步佇列的head節點,以及觸發"傳播"操作:
  2. 如果head的後繼是共用型別節點或者為null,呼叫doReleaseShared()來喚醒後繼

4.5.14 doReleaseShared()方法

//遍歷同步佇列,調整節點狀態,喚醒待申請節點
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            //1、head 不等於 tail 且不等於 null
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {  //如果head狀態為signal ,cas修改為0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); //喚醒後繼
                }
                //如果節點的後繼還沒有將其前驅改為signal,這裡ws==0是成立的
                else if (ws == 0 &&  //如果head狀態為0,cas修改為propagate 
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;               //  如果在int ws = h.waitStatus; 之後,後繼將head節點改為signal,那麼cas失敗,continue繼續迴圈後, if (ws == Node.SIGNAL) 滿足,那麼將會喚醒後繼。
            }
           // 只有head沒有發生變化,迴圈才會結束,若head改變,繼續迴圈  
            if (h == head)                   // loop if head changed
                break;
        }
    }

doReleaseShared()總結

  1. 如果頭節點狀態為signal,那麼CAS更新頭節點狀態為0,成功則呼叫unparkSuccessor()喚醒後繼,失敗則重試
  2. 如果頭節點狀態為0,那麼將CAS更新頭節點狀態為PROPAGTATE ,失敗則重試。
  3. 最後如果判斷head是否發生變化,有變化則重複1、2,沒有變化則方法結束。
  4. PROPAGTATE狀態的意義是,增加一個狀態判斷,當前驅獲取資源,後繼同時也有機會獲取到資源

4.5.15 releaseShared()方法

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

releaseShared()方法總結

  1. 呼叫子類的實現方法tryReleaseShared()釋放n個共用資源,釋放成功則繼續呼叫doReleaseShared()來喚醒佇列中的等待節點

5 取消節點移出連結串列分析

有兩種情景,會將取消節點徹底移出連結串列:

  1. 頭節點unPark喚醒後繼時,後繼節點喚醒後重新進入shouldParkAfterFailedAcquire()
  2. 取消節點後面有新節點入列時,新節點執行shouldParkAfterFailedAcquire()

以第一個情景為例子分析:

6 在shared模式中為什麼需要PROPAGATE狀態

結論:在前驅節點獲取資源時,後繼也能夠有機會申請資源,不需要等待前驅通過releaseShare()來喚醒
分析如下:

1:A B 先後進入佇列
 2:A被喚醒,獲得資源,呼叫setHeadAndPropagate(),晉升為head
 3、B呼叫shouldParkAfterFailedAcquire(),嘗試將A狀態改為signal但未執行
 	4、A進入doReleaseShared(),A狀態等於0(3還沒執行),進入ws == 0 分支處理。
 		5、此時3執行完成,B將A的狀態改為signal,然後B park阻塞
 		6、A執行compareAndSetWaitStatus(h, 0, Node.PROPAGATE)失敗,continue繼續
 			7、A進入(ws == Node.SIGNAL)分支,執行compareAndSetWaitStatus(h, Node.SIGNAL, 0)成功,然後再執行unparkSuccessor(),將B喚醒。
 				8、A將B喚醒後,A去執行拿到資源後的操作,B也成功拿到資源並執行。
 				因為步驟6的continue,B不需要等待A執行releaseShare()被喚醒,在A獲取到資源時同時B也能快速獲取到資源,A、B可以同時執行獲得資源後的任務