多執行緒並行:AQS原始碼分析(2)——共用鎖的實現原理

2023-02-21 12:02:30

  在上一篇文章多執行緒並行(一)中我們通過acquire()詳細地分析了AQS中的獨佔鎖的獲取流程,提到獨佔鎖,自然少不了共用鎖,所以我們這邊文章就以AQS中的acquireShared()方法為例,來分析下並行程式設計中共用鎖的獲取與釋放吧,獲取共用鎖的大體流程和獲取獨佔鎖一樣,但是因為共用鎖可以被多個執行緒同時持有,所以共用鎖比起獨佔鎖來可能更復雜,文章有點長,靜下心來,慢慢讀,讀完之後可能會使你收穫頗多。

   通過上篇文章的分析,我們發現AQS中主要做三件事:1、同步狀態的state的獲取和釋放,即同步狀態的管理;2、同步佇列的維護;3、執行緒的阻塞和喚醒,即執行緒間的共同作業;AQS中定義了大量的同步狀態管理的模板方法,比如acquireShared()就是一個執行緒獲取共用鎖的入口方法,我們就從這個方法開始我們的共用鎖之旅吧!

  1、acquireShared(int arg)方法:

1 public final void acquireShared(int arg) {
2         //獲取共用資源成功直接返回
3         if (tryAcquireShared(arg) < 0)
4             //獲取資源不成功執行此方法阻塞
5             doAcquireShared(arg);
6     }

  這個方法是AQS中定義的一個模版方法,也是獲取共用鎖的入口,呼叫tryAcquireShared()嘗試獲取共用鎖,如果獲取共用鎖成功,則此方法直接返回;獲取共用鎖不成功,則執行doAcquireShared()方法,將當前節點包裝成Node節點,加入到同步佇列中,進行阻塞,直到被其它執行緒喚醒了,成功獲取到了共用鎖再返回。是不是和獲取獨佔鎖的流程很類似呢?是的,大致流程基本一致,但是兩者最大的區別共用鎖在同一時刻只能被一個執行緒持有,而共用鎖在同一時刻可能會被多個執行緒同時持有,所以共用鎖比獨佔鎖更復雜。其中這個tryAcquireShared()方法我們這個模版類中只提供了定義,並沒有提供實現,具體實現還需要自定義的同步器去實現。我們接著往下看doAcquireShared()方法:

  2、doAcquireShared方法:

 1     private void doAcquireShared(int arg) {
 2         //將當前執行緒加入到同步佇列中,並標記為共用模式
 3         final Node node = addWaiter(Node.SHARED);
 4         //執行緒阻塞等待獲取共用資源的過程中是否發生了異常
 5         boolean failed = true;
 6         try {
 7             //現在在阻塞等待獲取資源的過程中,其它執行緒對此執行緒是否發生了中斷請求
 8             boolean interrupted = false;
 9            
10             /*自旋,找到合適點的點將當前執行緒掛起,再尋找合適點的過程中也不斷嘗試重新獲取共用鎖,因為可能再這個嘗試的過程中,其它執行緒釋放了共用鎖*/
11             for (;;) {
12                 //找到當前節點的前繼節點
13                 final Node p = node.predecessor();
14                 
15                 //當前節點的前繼節點是同步佇列的頭節點
16                 if (p == head) {
17                     //嘗試獲取指定量的共用資源
18                     int r = tryAcquireShared(arg);
19                     //當前Node節點的執行緒成功獲取到了共用資源
20                     if (r >= 0) {
21                         //將當前執行緒Node節點設定為head頭節點,並嘗試喚醒後面的阻塞節點
22                         setHeadAndPropagate(node, r);
23                         p.next = null; // help GC
24                         //等待獲取資源的過程中發生了執行緒中斷的請求是不響應執行緒中斷的,所以這裡要將執行緒中斷補上。
25                         if (interrupted)
26                             //獲取獨佔鎖是放在acquire()方法中處理的,不過作用都一樣。
27                             selfInterrupt();
28                         //表示獲取阻塞獲取資源的過程中沒有發生異常,就不用執行finally中的取消方法了。
29                         failed = false;
30                         return;
31                     }
32                 }
33                 
34                 //找到當前被阻塞執行緒節點的前繼有效節點,將它的狀態設定為Node.SIGNAL
35                 if (shouldParkAfterFailedAcquire(p, node) &&
36                         /**
37                          * 找到了有效前繼節點並它的狀態設定為Node.SIGNAL,那麼我們就可以將當前節點park(),
38                          * 等待前繼節點釋放資源後喚醒它,喚醒之後在進行一次執行緒中斷檢測,進入下次"自旋"。
39                          */
40                     parkAndCheckInterrupt())
41                     interrupted = true;
42             }
43         } finally {
44             //阻塞等待獲取共用資源的時候發生了異常,需要將當前Node節點出隊,上一篇文章中講過,這裡就不再贅述了。
45             if (failed)
46                 cancelAcquire(node);
47         }
48     }

  上面已經說過這個方法主要幹兩件事情:1、將阻塞的執行緒包裝成Node節點,加入到同步佇列中;2、通過一定次數的「自旋」操作,當前執行緒找到合適的點,將自己掛起,等待其它執行緒喚醒。

  在尋找這個「合適點」(這個合適點的選擇,上篇文章多執行緒並行(一):以AQS中acquire()方法為例來分析多執行緒間的同步與共同作業中提到過,有不清楚的可以在這裡找到答案)的過程中,有可能其它執行緒釋放了共用鎖,那麼當前執行緒應該檢查下有沒有資格獲取,有資格獲取,並且獲取成功,那麼就將它自己設定為頭節點,然後喚醒後繼節點之後再返回,至此獲取鎖的整個流程就完了。

  細心的讀者可能發現,這個「自旋」中,將嘗試獲取鎖放在前面,將阻塞判定放在後面執行,現在想想這是不是一個型別do{}while()模型呢,要是第一次直接獲取鎖成功了,是不是執行緒就少了一次阻塞----》喚醒的狀態轉化呢?

  上面方法中也提到過,「如果當前執行緒節點的前繼節點是佇列head節點時,我們就可以嘗試獲取一次共用資源」。為什麼當前節點的前繼節點不是head節點的時候,就不能嘗試獲取共用鎖呢?這是因為當前這個LCH同步佇列是嚴格按照FIFO出隊的,當前節點前繼節點不是head, 說明在當前節點之前還有執行緒被阻塞等待獲取共用鎖,所以當前執行緒節點就應該老老實實地等待,等待它的前繼節點獲取成功共用鎖或者釋放了共用鎖之後,再喚醒它去嘗試獲取共用鎖吧。

  在獨佔鎖模式中,因為鎖只能被一個執行緒持有,所以當同步佇列中的一個執行緒獲取了獨佔鎖之後,只需要將它自身設定為頭節點,讓原來的頭節點「出隊」就可以了。但是,在共用鎖模式下,因為共用鎖可以被多個執行緒同時持有,當前執行緒獲取共用鎖成功,並將自身設定為頭節點之後,還需判斷同步佇列中是否有滿足喚醒條件的後繼節點,如果有則繼續喚醒後繼節點去競爭共用鎖,這個是通過 setHeadAndPropagate()來實現的。

  3、setHeadAndPropagate()方法分析:

private void setHeadAndPropagate(Node node, int propagate) {
        //後繼節點成功獲取了共用鎖,佇列的"舊head"還沒有改變,將其儲存下來,鎖定到方法的區域性變數做後序的判斷使用;
        Node h = head; // Record old head for check below
        /**
         * 將這個獲取共用鎖成功的後繼節點設定為同步佇列的「新head」,此時同步佇列的head發生變化, 此執行緒還未喚起任何執行緒。
         */
        setHead(node);
        /**
         * 1、h == null這個條件什麼時候成立呢?仔細翻了下AQS中的原始碼發現:
         * 這個setHeadAndPropagate()方法只在共用鎖模式下,同步佇列head的後繼節點成功獲取了共用鎖才會呼叫。
         * 獲取到共用鎖的當前執行緒是同步佇列的頭結點的後繼節點,"舊head"有後繼節點,說明同步佇列不為空,那麼"舊head"也必定不為空,
         * 此方法中第一行通過h == head,在執行setHead(node)方法之前將"舊head"儲存了下來,所以h == null必定不會成立,
         * 至於為什麼這麼寫呢? 查閱了下資料網上說"發現這個是防止空指標異常發生的標準寫法(既如果要取一個物件的某個屬性進行判斷的時候,首先對這個物件進行null判斷)。"
         * 這說的過去吧?
         * 
         * 2、(h = head) == null這個條件什麼時候成立呢?
         * 這個條件也是不可能成立的,下面這種情況應該是最常見的:
         *  (1)、例如有個Semaphore範例s初始化了2個許可,執行緒A首先呼叫s.acquire(2)申請了兩個許可,成功申請到了許可;
         *  (2)、執行緒B呼叫了s.acquire()方法申請一個許可,申請失敗,加入到同步佇列;
         *  (3)、執行緒C呼叫了s.acquire()方法申請一個許可,申請失敗,加入到同步佇列;
         *  (4)、執行緒A呼叫了s.releaseShared(2)方法釋放了兩個許可,再呼叫doReleaseShared()方法,進行同步佇列喚醒;
         *  (6)、首先喚醒了同步佇列中的執行緒B,B執行緒獲取到共用鎖:
         *      a)、如果此時執行緒B還未setHead(Node)方法,還未改變同步佇列的head頭結點,那麼執行緒A的喚醒工作就結束,也僅僅只是喚醒了同步佇列中的執行緒B,
         *              則必定有(h = head) == Node(C) != null成立,執行緒C的喚醒工作仍然需要執行緒B去執行;
         *      b)、如果此時執行緒B執行了SetHead(Node)方法,改變了同步佇列的head頭結點,那麼執行緒A同時也會喚醒執行緒C,相當於執行緒A同時喚醒了執行緒B和執行緒C:
         *         1)、如果執行緒C中的setHeadAndPropagate()線上程B前呼叫完畢(即執行緒C執行了setHead()方法改變了同步佇列的head),那麼 (h = head) == Node(C);
         *         2)、如果執行緒C中的setHeadAndPropagate()線上程B之後才呼叫(即執行緒C此時還未執行setHead()方法,未改變同步佇列的head),那麼 (h = head) == Node(B)
         *  所以綜上所述,只要執行過addWaiter()方法,向同步佇列中新增過執行緒,那麼(h = head)== null必定不成立。只能理解為「防止空指標的標準寫法」。 
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            /**
             * s == null這種情況是可能存在的,如果當前喚醒的這個node節點是同步佇列的尾節點就可能出現node.next == null;
             * s.isShared()指定是共用鎖模式,當前執行緒獲取共用鎖之後,是需要嘗試喚醒同步佇列中的其它執行緒的。
             */
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

  上面提到的這個方法其實就做了兩件事情:1、將當前獲取共用鎖的執行緒設定為同步佇列的頭節點;2、根據同步佇列頭節點head的狀態,來決定是否需要喚醒後續節點,符合條件就呼叫doReleaseShared()方法執行喚醒後續節點的操作。存在的各種情況,也在上面的程式碼中上面程式碼中分析過了,下面我們接著往下走來分析doReleaseShared()方法吧! 

  4、doReleaseShared()方法:

 1  private void doReleaseShared() {
 2         for (;;) {
 3             Node h = head;
 4             /**
 5              * h != null保證了佇列不為空,h != tail保證了佇列中有需要喚醒的節點,
 6              * 如果這不能同時滿足說明佇列中沒有需要喚醒的節點,此時h == head這個條件是成立的,
 7              * 直接跳轉到h == head判斷中break,此方法結束執行。
 8              */
 9             if(h != null && h != tail) {
10                 int ws = h.waitStatus;
11                 //如果頭節點的狀態是Node.SIGNAL說明後續有節點是需要喚醒的,
12                 if (ws == Node.SIGNAL) {
13                     /**
14                      * 考慮到共用鎖可以被多執行緒並行持有,可以採用CAS操作,將設定頭節點的狀態為的0的compareAndSetWaitStatus(h,Node.SIGNAL,0)的操作
15                      * 和unparkSuccessor(h)喚醒後節點的操作繫結在一起,這個CAS操作成功,說明頭節點之前肯定是Node.SIGNAL狀態,那麼後繼結點肯定能被喚醒。
16                      */
17                     if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
18                         continue;            // loop to recheck cases
19                     unparkSuccessor(h);
20                 }
21                 /**
22                  * ws == 0說明頭節點的後繼節點已經被喚醒或者即將被喚醒。
23                  */
24                 else if (ws == 0 &&
25                          !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
26                     continue;                // loop on failed CAS
27             }
28             //同步佇列的頭節點未發生變化,跳出喚醒的動作
29             if (h == head)                   // loop if head changed
30                 break;
31         }
32     }

   這個方法可能是共用鎖中最難理解的一個方法了,粗略地讀完上面的註釋,你是否會有以下幾個問題呢?

  Q1、什麼時候會呼叫doReleaseShared()方法呢?

  翻了下AQS原始碼我們發現有兩個地方呼叫:

  1、在獲取共用鎖acquireShared()方法中,滿足一定的條件下(共用鎖還可以被同步佇列中的其他執行緒獲取情況下),可以呼叫;

  2、在釋放共用鎖releaseShared()方法中,釋放成功一定會被呼叫;

  Q2、誰會呼叫呼叫doReleaseShared()方法呢?

  在通過上一篇文章我們瞭解到,獨佔鎖中,只有獲取了鎖的執行緒才能呼叫release釋放鎖,因此呼叫unparkSuccessor(h)喚醒後繼節點的必然是持有鎖的執行緒,該執行緒可看做是當前的頭節點(雖然在setHead方法中已經將頭節點的thread屬性設為了null,但是這個頭節點曾經代表的就是這個執行緒);

  而共用鎖中,持有鎖的執行緒可以有多個,這些執行緒都可以呼叫releaseShared()方法釋放鎖;假如這些執行緒都是從同步佇列出隊獲取共用鎖的,那麼他們必然曾經成為過head或者現在就是head,如果是reReaseShared()中的方法呼叫doReleaseShared()方法,那麼可能現在呼叫此方法的執行緒,已經不是同步佇列頭節點所代表的執行緒了,頭節點可能被易主好多次了。

  Q3、呼叫該doReleaseShared()方法的目的是什麼呢,何時結束這個喚醒操作呢?

  無論是在acquireShared()呼叫,還是在releaseShared()方法中呼叫,其目的就是在共用鎖是可用的狀態,喚醒頭節點的後繼有效節點,競爭共用鎖。但是共用鎖和獨佔鎖的一個重要區別是:共用鎖在頭節點發生變化時(說明後繼節點已經成功獲取了共用鎖,並執行了setHead()方法,將其設定為head),會再執行一次自旋喚醒新的頭節點的後繼節點,去競爭共用鎖。

  上面的話是什麼意思呢?換句話說:就是當前執行緒完成後繼節點喚醒任務,需要退出的時候,檢查了一下頭節點,喚醒的這個節點已經是新的頭節點了(這個喚醒的節點也成功獲取到了共用鎖),那麼它的後繼節點是有資格競爭共用鎖的,所以需要繼續喚醒它的後續節點,周而復始,直到h == head不再執行後續節點喚醒。

  Q4、什麼時候才會發生滿足h == head這個條件呢?

  經過分析大概有以下這麼幾種情況(歡迎大家補充,有不對的地方還請大家指出):

  1、阻塞佇列為空,即阻塞佇列中沒有需要喚醒的節點,滿足h == head這個條件。

  2、執行緒A喚醒了後繼執行緒B,但是執行緒B並沒有獲取到共用資源(執行緒B當然也就不會執行setHead()方法改變同步佇列的head了),又發生了執行緒阻塞,不需要再喚醒後續的執行緒了,也滿足h == head這個條件,那麼B的後續執行緒的喚醒工作應該交給執行緒B獲取資源時候在負責去喚醒吧。

  3、執行緒A喚醒了執行緒B,執行緒B成功獲取了資源,還是還未執行到setHead()這個方法;當前執行緒A,此時判斷h == head也成立了,其呼叫的doReleaseShared()方法結束了,那麼將喚醒執行緒B後續節點的工作,就應該交給剛剛被喚醒的執行緒B去執行了。

  Q5、ws == 0這個狀態怎麼理解?什麼情況下才會出現ws == 0這個狀態呢?

  其實我們仔細分析之前的程式碼我們可以得出以下結論:head的後繼節點已經被喚醒或者即將被喚醒,分以下幾種情況:

  1、有執行緒A剛釋放了鎖,剛執行了unparkSuccessor裡的if (ws < 0) compareAndSetWaitStatus(node, ws, 0);把head的狀態設定為了0,然後嘗試喚醒head後繼執行緒B,這裡也分3種情況:

              (1)、執行了if (ws < 0) compareAndSetWaitStatus(node, ws, 0),還沒有執行LockSupport(this)方法(後繼節點中的執行緒即將被喚醒);

    (2)、head後繼執行緒B獲取鎖成功,直到head後繼執行緒將自己設定為AQS的新head的這段時間裡,head的狀態為0(後繼節點中的執行緒已經被喚醒);

    (3)、head後繼執行緒B獲取鎖失敗,直到將head重置為Node.SIGNAL這段時間裡,這個head的狀態也是為0的(後繼節點中的執行緒已經被喚醒);

  2、同步佇列中只有一個head == tail 的dummy node節點,它的狀態為0;

  3、在第2中情況上更進一步,同步佇列中只有一個head == tail 的dummy node節點,它的狀態為0,此時有個執行緒A獲取共用鎖失敗了,但是隻進行了入隊操作,還未執行shouldParkAfterFailedAcquire()方法,未將head節點設定未Node.SIGNAL狀態,這段時間head的狀態也0(後繼節點中的執行緒即將被喚醒);

   綜上所述,我們不難看出,其實head.waitStatus == 0 這個狀態是一箇中間狀態,可能會很快改變。後繼節點獲取共用鎖失敗了,head節點不會發生變化,只不過很快會將head.waitStatus 設定未Node.SIGNAL;後繼結點獲取共用鎖成功,後繼節點會被設定為新的head,假如後繼節點不是尾節點,那麼必定新的head.waitSatus == Node.SIGNAL,如果後繼節點是尾節點,那麼必定head.waitStatus == 0,因為沒有後續入隊節點將它的狀態置為Node.SIGNAL。

  Q6、什麼時候會出現 ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)為true的情形呢?

  讀過上面的Q5中的分析,我們可以發現這個條件成立可能有兩種狀況:

  1)、第一種可能的情況是:有個執行緒A執行到doReleaseShared()方法,執行緒B獲取共用鎖的時候在同步佇列中阻塞的,此時有個執行緒C也執行了doReleaseShared()方法,doReleaseShared()方法呼叫unparkSuccessor()方法,設定head的ws == 0,此時執行緒A正好執行到了這個ws == 0的位置,而此時執行緒B獲取共用鎖失敗,執行shouldParkAfterFailedAcquire()方法,又設定head的ws = Node.SIGNAL,恰好執行緒A執行到compareAndSetWaitStatus(h,0,Node.PROPAGETE)為false;

  2)、第二種可能的情況是:同步佇列中head頭節點是剛剛成為頭節點的,它的waitStatus值還為0,尾節點是在這之後剛剛加進來的。這種怎麼理解呢?同步佇列中的「舊的尾節點」狀態是0,用執行緒A表示它,此時執行緒A剛剛獲取到共用鎖,將自己設定為頭結點head節點,此時有個執行緒B獲取共用鎖失敗,將自己加入到同步佇列中,此時執行緒B還未執行shouldParkAfterFailedAcquire()方法,改變同步佇列頭結點head的狀態;此時執行緒A執行了doReleaseShared()中的方法,發現ws == 0,但是恰好就在此時,執行緒B執行了

shouldParkAfterFailedAcquire()方法,設定head的waitStatus == Node.SIGNAL,緊接著執行緒A執行執行到compareAndSetWaitStatus(h,0,Node.PROPAGATE)失敗了,繼續continue進入下次「自旋」。

  由此可見,doReleaseShared()方法中else if 這個分支的 && 連線了兩個不一致的狀態,分別對應了shouldParkAfterFailedAcquirecompareAndSetWaitStatus(pred, ws, Node.SIGNAL)執行成功前和執行成功後,因為doReleaseSharedshouldParkAfterFailedAcquire是可以並行執行的,所以這一條件是有可能滿足的,只是滿足的條件非常嚴苛,可能只是一瞬間的事。

  至於共用鎖的釋放邏輯,相信看完上面的分析,再去看也不是什麼難事,這裡就不再贅述了。

  總結:

  • 共用鎖的呼叫框架和獨佔鎖和實現原理非常相似,兩者最大不同在於獲取鎖的邏輯——共用鎖可以被多個執行緒同時持有,而獨佔鎖同一時刻只能被一個執行緒持有。
  • 由於共用鎖同一時刻可以被多個執行緒持有,因此當頭節點獲取到共用鎖時,可以立即喚醒後繼節點來爭鎖,而不必等到釋放鎖的時候。因此,共用鎖觸發喚醒後繼節點的行為可能有兩處,一處在當前節點成功獲得共用鎖後,一處在當前節點釋放共用鎖後。

   鑑於水平有限就只能分析到如此了,如有說的不對的地方,還請大家批評指正,共同交流,共同進步。

  參考文章地址:

  1、https://www.cnblogs.com/waterystone/p/4920797.html

  2、https://blog.csdn.net/anlian523/article/details/106319294

  3、https://segmentfault.com/a/1190000016447307

  4、https://www.cnblogs.com/micrari/p/6937995.html