java多執行緒並行:以AQS中acquire()方法為例來分析多執行緒間的同步與共同作業

2023-02-16 18:00:36

  談到java中的並行,我們就避不開執行緒之間的同步和共同作業問題,談到執行緒同步和共同作業我們就不能不談談jdk中提供的AbstractQueuedSynchronizer(翻譯過來就是抽象的佇列同步器)機制;

  (一)、AQS中的state和Node含義:

    AQS中提供了一個int volatile state狀態的變數用來標識共用資源,AQS定義了兩種資源的佔用方式:

    1、獨佔模式(EXCLUSIVE):表示同一個資源,在同一時刻只能被一個執行緒持有,例如ReentrantLock等;
    2、共用模式(SHARED):表示同一個資源,在同一時刻可以被多個執行緒同時持有,例如Semaphore,CountDownLatch等;

    同時也提供了一個LCH佇列,用來存放獲取共用資源時候發生阻塞的Node節點,這個節點是對需要獲取資源執行緒的一個封裝,包含了執行緒本身和Node節點的狀態waitStatus,一共分為五種:

    /**表示當前節點中執行緒已經被取消排程,當timeout或者interrupt(假如會響應中斷的話)會觸發節點變更為此狀態,此節點的狀態再不會發生變化*/

    static final int CANCELLED = 1;

    /**表示當前節點中執行緒釋放資源後需要喚醒後繼節點執行緒,在採用尾插法將新結點加入到同步佇列的時候,會將新結點的前繼節點設定為SIGNAL */
    static final int SIGNAL = -1;
    /**表示當前節點中的執行緒在等待一個Condition喚醒,在其他執行緒中呼叫了這個Condition的signal()會將此Node從等待佇列的隊頭轉移到同步佇列的隊尾,嘗試競爭共用資源 */
    static final int CONDITION = -2;
    /**共用模式下,當前節點中的執行緒不僅需要喚醒後繼節點,還需要喚醒後繼節點的後繼節點*/
    static final int PROPAGATE = -3;

    除了上面這四種還有一個0,表示節點初始狀態,可以看出waitStatus<0才代表該節點是一個有效節點(即結點中的執行緒可以正常排程)。

    AQS的設計其實是採用了模版方法的設計思想,在AbstractQueuedSynchronizer中這個頂層類中只提供了一些公共的方法實現如:同步佇列的維護等,而共用資源的獲取和釋放只提供了方法的定義,並不提供具體的實現(只是丟擲了  unsupportedOperationException異常),通過這種方式就達到讓自定義的佇列同步器去強制實現的目的。

    上面我們提到,AQS定義了資源的兩種佔用方式:獨佔和共用,主要也就對應tryAcquire()-tryRelease(),tryAcquireShared()-tryReleaseShared()兩組方法需要我們自己去實現了:

    1、tryAcquire()獨佔模式,嘗試獲取資源,成功為true,失敗為false;

    2、tryRelease()獨佔模式,嘗試釋放資源,成功為true,失敗為false;

    3、tryAcquireShared()共用模式,嘗試獲取資源,負數為失敗,等於0為成功獲取,沒有剩餘資源,大於0為成功獲取,有剩餘共用資源;

    4、tryReleaseShared()共用模式,嘗試釋放資源,釋放之後需要喚醒等待節點為true,否則為false;

  (二)、程式碼剖析:

  1、acquire(int)方法:

  此方法是獲取共用資源的入口方法,程式碼如下:

    public final void acquire(int arg) {
        /*嘗試獲取共用資源,嘗試成功直接返回*/
        if (!tryAcquire(arg)  
                /* 1、搶佔共用資源失敗,則將當前節點放入到同步佇列的尾部,並標記為獨佔模式;
                 * 2、使執行緒阻塞在同步佇列中獲取資源,直到獲取成功才返回;如果整個過程中被中斷過就返回true,否則就返回false;*/
                && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
            /*阻塞獲取資源的過程中是不響應執行緒中斷的,內部進行了中斷檢測,檢測到了中斷請求,所以這塊進行執行緒中斷*/
            selfInterrupt();
        }
    }

  上面程式碼的大概執行流程是:

  1、加塞搶佔共用資源(因為同步佇列中可能還有其它節點等待),獲取成功則直接返回;

  2、當前執行緒搶佔共用資源失敗,呼叫addWaiter()將當前執行緒包裝成Node節點,加入同步佇列的尾部,並將當前節點標記為獨佔模式(EXCLUSIVE),並返回這個節點;

  3、當前執行緒呼叫acquireQueued()方法阻塞在同步佇列上獲取共用資源,該方法是一個同步方法,直到成功獲取了共用資源才會返回,在這個阻塞獲取資源的過程中,如果檢測到發生了執行緒的中斷會返回true,否則會返回false;

  4、在第3步中阻塞獲取資源的過程中也不會響應中斷,所以在上個獲取共用過程中,檢測到了執行緒中斷標記會在acquireQueued()方法返回為true時候,再呼叫selfInterrupt()中斷一次執行緒;

  2、addWaiter(Node mode)方法:

private Node addWaiter(Node mode) {
        /*以給定的模式將當前執行緒包裝成node節點*/
        Node node = new Node(Thread.currentThread(), mode);
        /*快速採用尾插法,將當前節點插入到同步佇列的隊尾*/
        Node predNode = tail;
        if (predNode != null) {
            /*preNode <-- node*/
            node.prev = predNode;
            /*採用CAS將node設定阻塞佇列的尾節點,設定成功,說明沒有並行*/
            if (compareAndSetTail(tail, node)) {
                predNode.next = node;
                /*尾插法插入成功,則直接返回當前節點*/
                return node;
            }
        }
        /*"自旋"將節點加入到佇列的尾部,直到成功為止*/
        enq(node);
        return node;
    }

  此方法是當前執行緒首次搶佔共用資源不成功,將當前執行緒以指定的模式(獨佔或者共用)包裝成Node插入到同步佇列的尾節點的方法,其執行邏輯也在程式碼中詳細註釋了,再概括下:

  1、將當前執行緒包裝為一個新的Node節點,並標記為獨佔模式;

  2、如果當前同步佇列的尾節點不為空(表示當前佇列不為空),採用尾插法,將新的Node節點插入到同步佇列的尾部,考慮到多執行緒並行的安全問題採用了CAS方式設定同步佇列的尾節點,設定成功則就直接返回這個新結點;

  3、如果快速插入不成功(可能有兩種情況:1、tail為空,即當前同步佇列為空;2、tail不為空,但是在使用CAS設定尾節點的時候出現了執行緒並行安全問題),則就呼叫enq(Node),採用「自旋」直到

將新結點成功插入到同步佇列的尾部;

  3、enq(Node) 方法:

 private Node enq(final Node node) {
        /*"自旋",將給定的節點插入到同步佇列的尾部*/
        for (;;) {
            Node t = tail;
            /*同步佇列為空,則建立一個thread為null的Node節點作為同步佇列的頭結點,並且將尾節點也設定為頭結點*/
            if (t == null) {
                /*CAS操作,設定同步佇列的頭結點*/
                if (compareAndSetHead(new Node())) {
                    /*將尾節點設定為頭結點,進入下次"自旋"*/
                    tail = head;
                }
            }else {
                /*尾部節點不為空,則進行正常新增動作*/
                node.prev = t;
                /*CAS操作,設定同步佇列的頭結點*/
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

  此方法採用了「自旋」操作,並結合CAS操作,在保證多執行緒並行執行緒安全的前提下,一定可以安全地將這個新結點插入到同步佇列的尾部。

  此段程式碼的執行流程是:

  1、判斷這個同步佇列是否為空(判斷tail為空即可),為空則建立一個空節點(不包含任何執行緒),並向尾節點也指向頭結點,

  2、然後進行下一次的自旋嘗試CAS操作,將方法傳入的Node節點嘗試加入到佇列的尾部,也通過一定次數的自旋操作,一定會加入到同步佇列的尾部,然後退出;

  3、如果一開始進入這個方法,佇列不為空,則就執行第2步驟不斷嘗試,直到成功;

  到此為止addWaiter()方法中的邏輯就分析完了,執行完畢之後返回,就行呼叫acquireQueued(Node)方法阻塞此執行緒,等待獲取資源了。

  acquireQueued(Node,int)方法:

final boolean acquireQueued(final Node node, int arg) {
        /*標記阻塞獲取資源的過程中是否發生了異常*/
        boolean failed = true;
        
        try {
            /*標記執行緒阻塞的過程中是否發生了中斷*/
            boolean interrupted = false;
            /*執行緒自旋阻塞*/
            for(;;){
          
/*獲取當前節點的前驅結點*/ final Node p = node.predecessor(); /*1、前驅結點是頭結點,則說明當前執行緒有資格獲取共用資源,嘗試獲取,獲取成功,將當前節點設定為頭結點*/ if (p == head && tryAcquire(arg)) { /*將當前節點設定為頭結點*/ setHead(node); p.next = null; //help GC failed = false;
            /*返回執行緒在阻塞的過程中是否接受到了中斷請求*/
return interrupted; } /*2、3當前節點的前驅結點不是頭結點,判斷當前執行緒是否可以掛起*/ if (shouldParkAfterFailedAcquire(p, node) /*4、當前執行緒可以掛起,則掛起執行緒,並且執行緒被unpark()或者interrupt()喚醒,檢查執行緒的狀態*/ && parkAndCheckInterrupt()) { interrupted = true; } } }finally{
  
if (failed) { /*5、阻塞獲取同步資源的時候,發生了異常,將當前Node節點從同步佇列中出隊*/ cancelAcquire(node); } }

  acquireQueued(Node)這個方法的目的就是使得當前執行緒阻塞,等待獲取資源,獲取成功之後才返回。那麼如何阻塞呢?看了上面的程式碼,讀到這裡我想大家心裡都有了答案:要麼自旋,要麼主動park(),此方法中採用了這兩種方式結合的方法,其大致的執行流程如下:

  1、自旋,首先當前節點的前繼節點是頭結點(走到這裡來了,說明前繼節點正在佔用共用資源,有可能在這個過程中正好釋放了),那麼我們當前執行緒就有資格去嘗試獲取共用資源,如果獲取共用資源成功,則結束自旋阻塞;

  2、如果前繼節點不是頭結點,或者爭搶共用資源失敗,那麼我們呼叫shouldParkAfterFailedAcquire(Node,Node)方法,判斷是否可以將此執行緒暫時掛起(不能無限制地自旋,會造成CPU佔用率飆升,安全的做法是自旋找到一個合適的點,將當前執行緒park()阻塞掛起);

  3、當前掛起之前,要向前尋找能將它喚醒的前繼節點,待前繼節點釋放資源之後,unpark()喚醒當前被阻塞的節點;

  4、執行緒已經到達安全點,可以呼叫parkAndCheckInterrupt()阻塞並等待喚醒,喚醒之後再檢查下,阻塞的這個過程中是否發生了執行緒中斷請求,由於這個阻塞過程是不響應執行緒中斷的,所以需要將這個中斷請求的狀態傳播出去;

  5、阻塞獲取同步資源的時候,發生了異常,取消當前執行緒的在同步佇列中的排隊;

  shouldParkAfterFailedAcquire(Node, Node )方法:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        /*判斷前驅結點的狀態,只有前驅結點的狀態為SIGNAL,後繼節點才能被喚醒,所以其可以安心地掛起來了*/
        if (ws == node.waitStatus) {
            return true;
        }
        
        /*ws>0表示前驅結點中的執行緒已經被取消排程了,則認為其是無效節點,繼續向前查詢,直至找到有效狀態的節點*/
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        }else {
            /*前驅結點狀態正常,將前驅結點狀態設定為SIGNAL,則前驅結點釋放資源的時候,就可以嘗試喚醒當前這個後繼節點了*/
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

   這個方法是在自旋中用來判斷是否可以將執行緒安全地park(),阻塞自旋的,那麼何時才能將當前執行緒安全地掛起呢?回想下,我們之前提到的節點的五種狀態中有一種SIGNAL,表示當前節點的執行緒釋放資源,可以喚醒後繼節點,所以我們就線上程掛起之前找到它的有效的前繼結點,將它的waitStatus狀態設定為SIGNAL,就可以保證前繼節點釋放資源之後,當前節點中的執行緒就可以被及時地喚醒,結束阻塞了,當前執行緒掛起之前的準備工作都做完了,那麼接下來就需要呼叫parkAndCheckInterrupt()方法,進行執行緒的掛起了。

  parkAndCheckInterrupt()方法:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

  執行到這裡了,說明執行緒可以阻塞,呼叫park()方法阻塞執行緒,等待其他執行緒中unpark()或者interrupt()喚醒次執行緒,喚醒之後執行Thread.interrupted()方法,檢測阻塞過程中的執行緒請求中斷,進入下次自旋,嘗試獲取共用資源。如果在阻塞獲取資源的過程中,發生了異常,failed = true,則執行finally中cancelAcquire(Node)方法,取消當前節點中執行緒的排程。

  上面說了分析了這麼多,只說了執行緒間的獲取資源時候的同步問題,那麼執行緒間的共同作業在哪裡體現呢?答案就是在acquireQueued(Node,int)方法中的finally塊中,執行緒在阻塞獲取共用資源的時候發生了異常,就會執行此方法將此節點從同步佇列中出隊,下面我們來分析下cancelAcquire(Node)方法:

  cancelAcquire(Node)方法:

 1     private void cancelAcquire(Node node) {
 2         //當前節點為空,則說明當前執行緒永遠不會被排程到了,所以直接返回
 3         if (node == null) {
 4             return;
 5         }
 6         
 7         /**
 8          * 接下來將點前Node節點從同步佇列出隊,主要做以下幾件事:
 9          * 1、將當前節點不與任何執行緒繫結,設定當前節點為Node.CANCELLED狀態;
10          * 2、將當前取消節點的前置非取消節點和後置非取消節點"連結"起來;
11          * 3、如果前置節點釋放了鎖,那麼當前取消節點承擔起後續節點的喚醒職責。
12          */
13         
14         //1、取消當前節點與執行緒的繫結
15         node.thread = null;
16         
17         //2、找到當前節點的有效前繼節點pred
18         Node pred = node.prev;
19         while (pred.waitStatus > 0) {
20             //為什麼雙向連結串列從後往前遍歷呢?而不是從前往後遍歷呢?
21             node.prev = pred = pred.prev;
22         }
23         //用作CAS操作時候的條件判斷需要使用的值
24         Node predNext = pred.next;
25         
26         //3、將當前節點設定為取消狀態
27         node.waitStatus = Node.CANCELLED;
28         
29         /**
30          * 接下來就需要將當前取消節點的前後兩個有效節點"連結"起來了,"達成讓當前node節點出隊的目的"。
31          * 這裡按照node節點在同步佇列中的不同位置分了三種情況:
32          * 1、node節點是同步佇列的尾節點tail;
33          * 2、node節點既不是同步佇列頭結點head的後繼節點,也不是尾節點tail;
34          * 3、node節點是同步佇列頭結點head的後繼節點;
35          */
36         
37         //1、node是尾節點,並且執行過程中沒有並行,直接將pred設定為同步佇列的tail
38         if (node == tail && compareAndSetTail(node, pred)) {
39             /*
40              * 此時pred已經設定為同步佇列的tail,需要通過CAS操作,將pred的next指向null,沒有節點再參照node,就完成了node節點的出隊42              */
43             compareAndSetNext(pred, predNext, null);
44         }else {
45             /*
46              * 2、node不是尾節點,也不是頭結點head的後繼節點,那麼當前節點node出隊以後,node的有效前繼結點pred,
47              *  就有義務在它自身釋放資源的時候,喚醒node的有效後繼節點successor,即將pred的狀態設定為Node.SIGNAL;
48              */
49             int ws;
50             //能執行到這裡,說明當前node節點不是head的後繼節點,也不是同步佇列tail節點
51             if (pred != head &&
52                     ((ws = pred.waitStatus) == Node.SIGNAL ||
53                     //前繼節點狀態雖然有效但不是SIGNAL,採用CAS操作設定為SIGNAL確保後繼有效節點可以被喚醒
54                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 
55                     pred.thread != null) {
56                 Node next = node.next;
57                 //只負責喚醒有效後繼節點
58                 if (next != null && next.waitStatus <= 0) {
59                     /**
60                      * 下面這段程式碼相當於將pred-->next,我們提到這個同步佇列是個雙向佇列,那麼pred<--next這是誰執行的呢?
61                      * 答案是其他執行緒:其它執行緒在後序的獲取共用資源在同步佇列中阻塞的時候,呼叫shouldParkAfterFailedAcquire()方法,
62                      * 從後向前遍歷佇列,尋找能喚醒它的有效前繼節點,當找到node的時候,因為它的狀態已經是Node.CANCELLED,所以會忽略node節點,
63                      * 直到遍歷到有效前繼節點pred,將next.prev執行pred,即next--->pred,沒有節點再參照node節點,所以node節點至此才完成出隊。
64                      */
65                     compareAndSetNext(pred, predNext, next);
66                 }
67             }else {
68                 //3、說明node節點是同步佇列head的後繼節點,呼叫unparkSuccessor(Node)喚醒其他執行緒,達到讓當前node"出隊"。
69                 unparkSuccessor(node);
70             }
71             
72             node.next = node;//help GC
73         }
74     }

   這個方法的中的註釋寫的很詳細了,執行緒間的共同作業主要體現在第65行的程式碼中,原因也在註釋中寫明瞭,就不再贅述了。

  還有一個非常關鍵的問題就是:為什麼我們在遍歷同步佇列的時候是從尾部向前遍歷,而不是從頭部向尾部遍歷呢?我們可以回過頭去看看入隊時候的enq(Node)方法:

  

  關鍵在11行-14行這部分程式碼中,11行保證了多執行緒環境下,採用自旋可以將當前執行緒順利地加入到同步佇列的尾部。
  假如有執行緒A執行了滿足了if條件,成功將執行緒A放入了tail節點,還未執行到12行,t.next = null,此時發生了執行緒切換執行B執行緒,B執行緒也執行了此方法,並且執行完畢,尾插法會將B執行緒節點追加到A執行緒節點之後,這時候又有個C執行緒執行了遍歷操作,假設從隊頭向隊尾遍歷,遍歷到A節點時,那麼可能會出現t.next = null這種情況,停止遍歷,漏掉B執行緒節點的情況,而採用從同步佇列的尾部向頭部遍歷則可以避免這個問題。

  下個問題就是unparkSuccessor(node)方法的原理是什麼呢?

  unparkSuccessor(node)方法:

 1   private void unparkSuccessor(Node node) {
 2         /*
 3          * If status is negative (i.e., possibly needing signal) try
 4          * to clear in anticipation of signalling.  It is OK if this
 5          * fails or if status is changed by waiting thread.
 6          */
 7         //在這裡,這個節點其實是同步佇列的頭結點,頭結點喚醒後繼節點之後,使命就完成了,所以應該將其狀態置為0
 8         int ws = node.waitStatus;
 9         if (ws < 0)
10             compareAndSetWaitStatus(node, ws, 0);
11 
12         /*
13          * Thread to unpark is held in successor, which is normally
14          * just the next node.  But if cancelled or apparently null,
15          * traverse backwards from tail to find the actual
16          * non-cancelled successor.
17          */
18         Node s = node.next;
19         //因為s.next相當於從同步佇列的頭部遍歷所以可能會出現s == null的情況,上面分析過原因,不再贅述了。
20         if (s == null || s.waitStatus > 0) {
21             s = null;
22             //從同步佇列的尾部向前遍歷,找到當前node節點(頭結點)的最近的有效後繼節點
23             for (Node t = tail; t != null && t != node; t = t.prev)
24                 if (t.waitStatus <= 0)
25                     s = t;
26         }
27         
28         /**
29          * 找到最近的有效後繼節點,則喚醒後繼節點中的執行緒在parkAndCheckInterrupt()方法上的阻塞,去嘗試競爭共用資源,
30          * 這就體現了執行緒之間的共同作業,而在這個競爭的過程中也會忽略這個Node.CANCELLED狀態的節點,這當前node節點也就放棄了競爭共用資源的機會,相當於出隊了。
31          */
32         if (s != null)
33             LockSupport.unpark(s.thread);
34     }

   以上就是對unparksuccessor(Node)方法的簡單分析了。再回過頭來,我們在cancelAcquire(Node)將當前要取消Node按照位置關係分為了三種,為什麼我們會忽略head位置呢?

     從setHead()的實現以及所有呼叫的地方可以看出,head指向的節點必定是拿到鎖(或是競爭資源)的節點,而head的後繼節點則是有資格爭奪鎖的節點,我們不在需要甚至喚醒條件了。再後續的節點,就是阻塞著的了。head指向的節點,曾經關聯的執行緒必定已經獲取到資源,在執行了,所以head無需再關聯到該執行緒了。head所指向的節點,也無需再參與任何的競爭操作了。現在再來看node出隊時的分類,就好理解了。head既然不會參與任何資源競爭了,自然也就和cancelAquire()無關了。

  仔細分析這個acquire()方法流程非常複雜,找了個一張網上一個博主畫的流程圖非常棒,這裡借鑑一下:

  

  好了文章就寫到這裡了,鑑於水平有限,有說的不對的地方歡迎大家批評指正。

  參考文章地址:

  1、https://blog.csdn.net/weixin_38106322/article/details/107121149

  2、https://www.jianshu.com/p/01f2046aab64

  3、https://blog.csdn.net/foxException/article/details/108917338