談到java中的並行,我們就避不開執行緒之間的同步和共同作業問題,談到執行緒同步和共同作業我們就不能不談談jdk中提供的AbstractQueuedSynchronizer(翻譯過來就是抽象的佇列同步器)機制;
(一)、AQS中的state和Node含義:
AQS中提供了一個int volatile state狀態的變數用來標識共用資源,AQS定義了兩種資源的佔用方式:
同時也提供了一個LCH佇列,用來存放獲取共用資源時候發生阻塞的Node節點,這個節點是對需要獲取資源執行緒的一個封裝,包含了執行緒本身和Node節點的狀態waitStatus,一共分為五種:
/**表示當前節點中執行緒已經被取消排程,當timeout或者interrupt(假如會響應中斷的話)會觸發節點變更為此狀態,此節點的狀態再不會發生變化*/
static final int CANCELLED = 1;
/**表示當前節點中執行緒釋放資源後需要喚醒後繼節點執行緒,在採用尾插法將新結點加入到同步佇列的時候,會將新結點的前繼節點設定為SIGNAL */
static final int SIGNAL = -1;
/**表示當前節點中的執行緒在等待一個Condition喚醒,在其他執行緒中呼叫了這個Condition的signal()會將此Node從等待佇列的隊頭轉移到同步佇列的隊尾,嘗試競爭共用資源 */
static final int CONDITION = -2;
/**共用模式下,當前節點中的執行緒不僅需要喚醒後繼節點,還需要喚醒後繼節點的後繼節點*/
static final int PROPAGATE = -3;
除了上面這四種還有一個0,表示節點初始狀態,可以看出waitStatus<0才代表該節點是一個有效節點(即結點中的執行緒可以正常排程)。
AQS的設計其實是採用了模版方法的設計思想,在AbstractQueuedSynchronizer中這個頂層類中只提供了一些公共的方法實現如:同步佇列的維護等,而共用資源的獲取和釋放只提供了方法的定義,並不提供具體的實現(只是丟擲了 unsupportedOperationException異常),通過這種方式就達到讓自定義的佇列同步器去強制實現的目的。
上面我們提到,AQS定義了資源的兩種佔用方式:獨佔和共用,主要也就對應tryAcquire()-tryRelease(),tryAcquireShared()-tryReleaseShared()兩組方法需要我們自己去實現了:
1、tryAcquire()獨佔模式,嘗試獲取資源,成功為true,失敗為false;
2、tryRelease()獨佔模式,嘗試釋放資源,成功為true,失敗為false;
3、tryAcquireShared()共用模式,嘗試獲取資源,負數為失敗,等於0為成功獲取,沒有剩餘資源,大於0為成功獲取,有剩餘共用資源;
4、tryReleaseShared()共用模式,嘗試釋放資源,釋放之後需要喚醒等待節點為true,否則為false;
(二)、程式碼剖析:
1、acquire(int)方法:
此方法是獲取共用資源的入口方法,程式碼如下:
public final void acquire(int arg) { /*嘗試獲取共用資源,嘗試成功直接返回*/ if (!tryAcquire(arg) /* 1、搶佔共用資源失敗,則將當前節點放入到同步佇列的尾部,並標記為獨佔模式; * 2、使執行緒阻塞在同步佇列中獲取資源,直到獲取成功才返回;如果整個過程中被中斷過就返回true,否則就返回false;*/ && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { /*阻塞獲取資源的過程中是不響應執行緒中斷的,內部進行了中斷檢測,檢測到了中斷請求,所以這塊進行執行緒中斷*/ selfInterrupt(); } }
上面程式碼的大概執行流程是:
1、加塞搶佔共用資源(因為同步佇列中可能還有其它節點等待),獲取成功則直接返回;
2、當前執行緒搶佔共用資源失敗,呼叫addWaiter()將當前執行緒包裝成Node節點,加入同步佇列的尾部,並將當前節點標記為獨佔模式(EXCLUSIVE),並返回這個節點;
3、當前執行緒呼叫acquireQueued()方法阻塞在同步佇列上獲取共用資源,該方法是一個同步方法,直到成功獲取了共用資源才會返回,在這個阻塞獲取資源的過程中,如果檢測到發生了執行緒的中斷會返回true,否則會返回false;
4、在第3步中阻塞獲取資源的過程中也不會響應中斷,所以在上個獲取共用過程中,檢測到了執行緒中斷標記會在acquireQueued()方法返回為true時候,再呼叫selfInterrupt()中斷一次執行緒;
2、addWaiter(Node mode)方法:
private Node addWaiter(Node mode) { /*以給定的模式將當前執行緒包裝成node節點*/ Node node = new Node(Thread.currentThread(), mode); /*快速採用尾插法,將當前節點插入到同步佇列的隊尾*/ Node predNode = tail; if (predNode != null) { /*preNode <-- node*/ node.prev = predNode; /*採用CAS將node設定阻塞佇列的尾節點,設定成功,說明沒有並行*/ if (compareAndSetTail(tail, node)) { predNode.next = node; /*尾插法插入成功,則直接返回當前節點*/ return node; } } /*"自旋"將節點加入到佇列的尾部,直到成功為止*/ enq(node); return node; }
此方法是當前執行緒首次搶佔共用資源不成功,將當前執行緒以指定的模式(獨佔或者共用)包裝成Node插入到同步佇列的尾節點的方法,其執行邏輯也在程式碼中詳細註釋了,再概括下:
1、將當前執行緒包裝為一個新的Node節點,並標記為獨佔模式;
2、如果當前同步佇列的尾節點不為空(表示當前佇列不為空),採用尾插法,將新的Node節點插入到同步佇列的尾部,考慮到多執行緒並行的安全問題採用了CAS方式設定同步佇列的尾節點,設定成功則就直接返回這個新結點;
3、如果快速插入不成功(可能有兩種情況:1、tail為空,即當前同步佇列為空;2、tail不為空,但是在使用CAS設定尾節點的時候出現了執行緒並行安全問題),則就呼叫enq(Node),採用「自旋」,直到
將新結點成功插入到同步佇列的尾部;
3、enq(Node) 方法:
private Node enq(final Node node) { /*"自旋",將給定的節點插入到同步佇列的尾部*/ for (;;) { Node t = tail; /*同步佇列為空,則建立一個thread為null的Node節點作為同步佇列的頭結點,並且將尾節點也設定為頭結點*/ if (t == null) { /*CAS操作,設定同步佇列的頭結點*/ if (compareAndSetHead(new Node())) { /*將尾節點設定為頭結點,進入下次"自旋"*/ tail = head; } }else { /*尾部節點不為空,則進行正常新增動作*/ node.prev = t; /*CAS操作,設定同步佇列的頭結點*/ if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
此方法採用了「自旋」操作,並結合CAS操作,在保證多執行緒並行執行緒安全的前提下,一定可以安全地將這個新結點插入到同步佇列的尾部。
此段程式碼的執行流程是:
1、判斷這個同步佇列是否為空(判斷tail為空即可),為空則建立一個空節點(不包含任何執行緒),並向尾節點也指向頭結點,
2、然後進行下一次的自旋嘗試CAS操作,將方法傳入的Node節點嘗試加入到佇列的尾部,也通過一定次數的自旋操作,一定會加入到同步佇列的尾部,然後退出;
3、如果一開始進入這個方法,佇列不為空,則就執行第2步驟不斷嘗試,直到成功;
到此為止addWaiter()方法中的邏輯就分析完了,執行完畢之後返回,就行呼叫acquireQueued(Node)方法阻塞此執行緒,等待獲取資源了。
acquireQueued(Node,int)方法:
final boolean acquireQueued(final Node node, int arg) { /*標記阻塞獲取資源的過程中是否發生了異常*/ boolean failed = true; try { /*標記執行緒阻塞的過程中是否發生了中斷*/ boolean interrupted = false; /*執行緒自旋阻塞*/ for(;;){
/*獲取當前節點的前驅結點*/ final Node p = node.predecessor(); /*1、前驅結點是頭結點,則說明當前執行緒有資格獲取共用資源,嘗試獲取,獲取成功,將當前節點設定為頭結點*/ if (p == head && tryAcquire(arg)) { /*將當前節點設定為頭結點*/ setHead(node); p.next = null; //help GC failed = false;
/*返回執行緒在阻塞的過程中是否接受到了中斷請求*/ return interrupted; } /*2、3當前節點的前驅結點不是頭結點,判斷當前執行緒是否可以掛起*/ if (shouldParkAfterFailedAcquire(p, node) /*4、當前執行緒可以掛起,則掛起執行緒,並且執行緒被unpark()或者interrupt()喚醒,檢查執行緒的狀態*/ && parkAndCheckInterrupt()) { interrupted = true; } } }finally{
if (failed) { /*5、阻塞獲取同步資源的時候,發生了異常,將當前Node節點從同步佇列中出隊*/ cancelAcquire(node); } }
acquireQueued(Node)這個方法的目的就是使得當前執行緒阻塞,等待獲取資源,獲取成功之後才返回。那麼如何阻塞呢?看了上面的程式碼,讀到這裡我想大家心裡都有了答案:要麼自旋,要麼主動park(),此方法中採用了這兩種方式結合的方法,其大致的執行流程如下:
1、自旋,首先當前節點的前繼節點是頭結點(走到這裡來了,說明前繼節點正在佔用共用資源,有可能在這個過程中正好釋放了),那麼我們當前執行緒就有資格去嘗試獲取共用資源,如果獲取共用資源成功,則結束自旋阻塞;
2、如果前繼節點不是頭結點,或者爭搶共用資源失敗,那麼我們呼叫shouldParkAfterFailedAcquire(Node,Node)方法,判斷是否可以將此執行緒暫時掛起(不能無限制地自旋,會造成CPU佔用率飆升,安全的做法是自旋找到一個合適的點,將當前執行緒park()阻塞掛起);
3、當前掛起之前,要向前尋找能將它喚醒的前繼節點,待前繼節點釋放資源之後,unpark()喚醒當前被阻塞的節點;
4、執行緒已經到達安全點,可以呼叫parkAndCheckInterrupt()阻塞並等待喚醒,喚醒之後再檢查下,阻塞的這個過程中是否發生了執行緒中斷請求,由於這個阻塞過程是不響應執行緒中斷的,所以需要將這個中斷請求的狀態傳播出去;
5、阻塞獲取同步資源的時候,發生了異常,取消當前執行緒的在同步佇列中的排隊;
shouldParkAfterFailedAcquire(Node, Node )方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; /*判斷前驅結點的狀態,只有前驅結點的狀態為SIGNAL,後繼節點才能被喚醒,所以其可以安心地掛起來了*/ if (ws == node.waitStatus) { return true; } /*ws>0表示前驅結點中的執行緒已經被取消排程了,則認為其是無效節點,繼續向前查詢,直至找到有效狀態的節點*/ if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; }else { /*前驅結點狀態正常,將前驅結點狀態設定為SIGNAL,則前驅結點釋放資源的時候,就可以嘗試喚醒當前這個後繼節點了*/ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
這個方法是在自旋中用來判斷是否可以將執行緒安全地park(),阻塞自旋的,那麼何時才能將當前執行緒安全地掛起呢?回想下,我們之前提到的節點的五種狀態中有一種SIGNAL,表示當前節點的執行緒釋放資源,可以喚醒後繼節點,所以我們就線上程掛起之前找到它的有效的前繼結點,將它的waitStatus狀態設定為SIGNAL,就可以保證前繼節點釋放資源之後,當前節點中的執行緒就可以被及時地喚醒,結束阻塞了,當前執行緒掛起之前的準備工作都做完了,那麼接下來就需要呼叫parkAndCheckInterrupt()方法,進行執行緒的掛起了。
parkAndCheckInterrupt()方法:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
執行到這裡了,說明執行緒可以阻塞,呼叫park()方法阻塞執行緒,等待其他執行緒中unpark()或者interrupt()喚醒次執行緒,喚醒之後執行Thread.interrupted()方法,檢測阻塞過程中的執行緒請求中斷,進入下次自旋,嘗試獲取共用資源。如果在阻塞獲取資源的過程中,發生了異常,failed = true,則執行finally中cancelAcquire(Node)方法,取消當前節點中執行緒的排程。
上面說了分析了這麼多,只說了執行緒間的獲取資源時候的同步問題,那麼執行緒間的共同作業在哪裡體現呢?答案就是在acquireQueued(Node,int)方法中的finally塊中,執行緒在阻塞獲取共用資源的時候發生了異常,就會執行此方法將此節點從同步佇列中出隊,下面我們來分析下cancelAcquire(Node)方法:
cancelAcquire(Node)方法:
1 private void cancelAcquire(Node node) { 2 //當前節點為空,則說明當前執行緒永遠不會被排程到了,所以直接返回 3 if (node == null) { 4 return; 5 } 6 7 /** 8 * 接下來將點前Node節點從同步佇列出隊,主要做以下幾件事: 9 * 1、將當前節點不與任何執行緒繫結,設定當前節點為Node.CANCELLED狀態; 10 * 2、將當前取消節點的前置非取消節點和後置非取消節點"連結"起來; 11 * 3、如果前置節點釋放了鎖,那麼當前取消節點承擔起後續節點的喚醒職責。 12 */ 13 14 //1、取消當前節點與執行緒的繫結 15 node.thread = null; 16 17 //2、找到當前節點的有效前繼節點pred 18 Node pred = node.prev; 19 while (pred.waitStatus > 0) { 20 //為什麼雙向連結串列從後往前遍歷呢?而不是從前往後遍歷呢? 21 node.prev = pred = pred.prev; 22 } 23 //用作CAS操作時候的條件判斷需要使用的值 24 Node predNext = pred.next; 25 26 //3、將當前節點設定為取消狀態 27 node.waitStatus = Node.CANCELLED; 28 29 /** 30 * 接下來就需要將當前取消節點的前後兩個有效節點"連結"起來了,"達成讓當前node節點出隊的目的"。 31 * 這裡按照node節點在同步佇列中的不同位置分了三種情況: 32 * 1、node節點是同步佇列的尾節點tail; 33 * 2、node節點既不是同步佇列頭結點head的後繼節點,也不是尾節點tail; 34 * 3、node節點是同步佇列頭結點head的後繼節點; 35 */ 36 37 //1、node是尾節點,並且執行過程中沒有並行,直接將pred設定為同步佇列的tail 38 if (node == tail && compareAndSetTail(node, pred)) { 39 /* 40 * 此時pred已經設定為同步佇列的tail,需要通過CAS操作,將pred的next指向null,沒有節點再參照node,就完成了node節點的出隊42 */ 43 compareAndSetNext(pred, predNext, null); 44 }else { 45 /* 46 * 2、node不是尾節點,也不是頭結點head的後繼節點,那麼當前節點node出隊以後,node的有效前繼結點pred, 47 * 就有義務在它自身釋放資源的時候,喚醒node的有效後繼節點successor,即將pred的狀態設定為Node.SIGNAL; 48 */ 49 int ws; 50 //能執行到這裡,說明當前node節點不是head的後繼節點,也不是同步佇列tail節點 51 if (pred != head && 52 ((ws = pred.waitStatus) == Node.SIGNAL || 53 //前繼節點狀態雖然有效但不是SIGNAL,採用CAS操作設定為SIGNAL確保後繼有效節點可以被喚醒 54 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 55 pred.thread != null) { 56 Node next = node.next; 57 //只負責喚醒有效後繼節點 58 if (next != null && next.waitStatus <= 0) { 59 /** 60 * 下面這段程式碼相當於將pred-->next,我們提到這個同步佇列是個雙向佇列,那麼pred<--next這是誰執行的呢? 61 * 答案是其他執行緒:其它執行緒在後序的獲取共用資源在同步佇列中阻塞的時候,呼叫shouldParkAfterFailedAcquire()方法, 62 * 從後向前遍歷佇列,尋找能喚醒它的有效前繼節點,當找到node的時候,因為它的狀態已經是Node.CANCELLED,所以會忽略node節點, 63 * 直到遍歷到有效前繼節點pred,將next.prev執行pred,即next--->pred,沒有節點再參照node節點,所以node節點至此才完成出隊。 64 */ 65 compareAndSetNext(pred, predNext, next); 66 } 67 }else { 68 //3、說明node節點是同步佇列head的後繼節點,呼叫unparkSuccessor(Node)喚醒其他執行緒,達到讓當前node"出隊"。 69 unparkSuccessor(node); 70 } 71 72 node.next = node;//help GC 73 } 74 }
這個方法的中的註釋寫的很詳細了,執行緒間的共同作業主要體現在第65行的程式碼中,原因也在註釋中寫明瞭,就不再贅述了。
還有一個非常關鍵的問題就是:為什麼我們在遍歷同步佇列的時候是從尾部向前遍歷,而不是從頭部向尾部遍歷呢?我們可以回過頭去看看入隊時候的enq(Node)方法:
關鍵在11行-14行這部分程式碼中,11行保證了多執行緒環境下,採用自旋可以將當前執行緒順利地加入到同步佇列的尾部。
假如有執行緒A執行了滿足了if條件,成功將執行緒A放入了tail節點,還未執行到12行,t.next = null,此時發生了執行緒切換執行B執行緒,B執行緒也執行了此方法,並且執行完畢,尾插法會將B執行緒節點追加到A執行緒節點之後,這時候又有個C執行緒執行了遍歷操作,假設從隊頭向隊尾遍歷,遍歷到A節點時,那麼可能會出現t.next = null這種情況,停止遍歷,漏掉B執行緒節點的情況,而採用從同步佇列的尾部向頭部遍歷則可以避免這個問題。
下個問題就是unparkSuccessor(node)方法的原理是什麼呢?
unparkSuccessor(node)方法:
1 private void unparkSuccessor(Node node) { 2 /* 3 * If status is negative (i.e., possibly needing signal) try 4 * to clear in anticipation of signalling. It is OK if this 5 * fails or if status is changed by waiting thread. 6 */ 7 //在這裡,這個節點其實是同步佇列的頭結點,頭結點喚醒後繼節點之後,使命就完成了,所以應該將其狀態置為0 8 int ws = node.waitStatus; 9 if (ws < 0) 10 compareAndSetWaitStatus(node, ws, 0); 11 12 /* 13 * Thread to unpark is held in successor, which is normally 14 * just the next node. But if cancelled or apparently null, 15 * traverse backwards from tail to find the actual 16 * non-cancelled successor. 17 */ 18 Node s = node.next; 19 //因為s.next相當於從同步佇列的頭部遍歷所以可能會出現s == null的情況,上面分析過原因,不再贅述了。 20 if (s == null || s.waitStatus > 0) { 21 s = null; 22 //從同步佇列的尾部向前遍歷,找到當前node節點(頭結點)的最近的有效後繼節點 23 for (Node t = tail; t != null && t != node; t = t.prev) 24 if (t.waitStatus <= 0) 25 s = t; 26 } 27 28 /** 29 * 找到最近的有效後繼節點,則喚醒後繼節點中的執行緒在parkAndCheckInterrupt()方法上的阻塞,去嘗試競爭共用資源, 30 * 這就體現了執行緒之間的共同作業,而在這個競爭的過程中也會忽略這個Node.CANCELLED狀態的節點,這當前node節點也就放棄了競爭共用資源的機會,相當於出隊了。 31 */ 32 if (s != null) 33 LockSupport.unpark(s.thread); 34 }
以上就是對unparksuccessor(Node)方法的簡單分析了。再回過頭來,我們在cancelAcquire(Node)將當前要取消Node按照位置關係分為了三種,為什麼我們會忽略head位置呢?
從setHead()的實現以及所有呼叫的地方可以看出,head指向的節點必定是拿到鎖(或是競爭資源)的節點,而head的後繼節點則是有資格爭奪鎖的節點,我們不在需要甚至喚醒條件了。再後續的節點,就是阻塞著的了。head指向的節點,曾經關聯的執行緒必定已經獲取到資源,在執行了,所以head無需再關聯到該執行緒了。head所指向的節點,也無需再參與任何的競爭操作了。現在再來看node出隊時的分類,就好理解了。head既然不會參與任何資源競爭了,自然也就和cancelAquire()無關了。
仔細分析這個acquire()方法流程非常複雜,找了個一張網上一個博主畫的流程圖非常棒,這裡借鑑一下:
好了文章就寫到這裡了,鑑於水平有限,有說的不對的地方歡迎大家批評指正。
參考文章地址:
1、https://blog.csdn.net/weixin_38106322/article/details/107121149
2、https://www.jianshu.com/p/01f2046aab64
3、https://blog.csdn.net/foxException/article/details/108917338
本文來自部落格園,作者:一隻烤鴨朝北走,僅用於技術學習,所有資源都來源於網路,部分是轉發,部分是個人總結。歡迎共同學習和轉載,轉載請在醒目位置標明原文。如有侵權,請留言告知,及時撤除。轉載請註明原文連結:https://www.cnblogs.com/wha6239/p/17122948.html