在java.util.concurrent包中,我們經常會使用ReentrantLock,CyclicBarrier等工具類,但是我們往往對其內部的實現原理卻並不知曉。
本篇文章主要對上述工具類的核心實現AQS進行剖析,分析原理可以讓我們學習到大神的程式碼設計思維。
文章將從一下幾個方面分析:
AbstractQueuedSynchronizer類就是我們通常說的AQS,抽象的佇列同步器。
其實我們學過作業系統原理都知道,所謂的同步,指的是多執行緒場景下通過某種機制,保證某段程式碼執行是執行緒獨享的,
我們把這段程式碼叫同步塊,而把這種機制叫同步。
在JAVA中,傳統的方式是使用synchronized關鍵字來實現同步,那麼其底層是基於C++實現的ObjectMonitor。
今天我們討論的AQS是JDK1.5之後,提供的一個能實現同步功能的抽象類。
通過該類的註釋,我們可以瞭解到其內部採用了FIFO佇列的資料結構來實現,互斥場景的資源申請和釋放的實現如下所示:
Acquire: while (!tryAcquire(arg)) { enqueue thread if it is not already queued; possibly block current thread; } Release: if (tryRelease(arg)) unblock the first queued thread;
而tryAquire()和tryRelease()方法都是需要子類去實現的。
換句話說,如果要使用AQS,那麼只需要繼承,然後實現如下方法來自定義資源獲取和釋放的邏輯就行了。
FIFO佇列的節點使用內部類Node來描述:
static final class Node { // 節點狀態 volatile int waitStatus; // 上一個節點 volatile Node prev; // 下一節點 volatile Node next; // 需要排隊的執行緒 volatile Thread thread; }
其他程式碼我先省略。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // 雙向連結串列頭節點 private transient volatile Node head; // 雙向連結串列為節點 private transient volatile Node tail; // 資源 private volatile int state; }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcuire()方法可以理解成獲取資源,由子類實現,我們不關注。
我把程式碼寫成下面這樣,可能好理解一些:
public final void acquire(int arg) { if(tryAcquire(arg)) { return; } Node n = addWaiter(Node.EXCLUSIVE); if(acquireQueued(n, arg)) selfInterrupt(); }
public final void acquire(int arg) { // 如果獲取資源成功,直接返回 if(tryAcquire(arg)) { return true; } // 未獲取到資源,將自己封裝成一個node新增到佇列尾部 Node n = addWaiter(Node.EXCLUSIVE); // 有點複雜,下面慢慢分析 if(acquireQueued(n, arg)) { // 自我中斷 先不關注。 selfInterrupt(); } }
起碼,我們應該知道一點,該方法一旦返回了,那麼就意味著可以進入同步塊程式碼執行了。
private Node addWaiter(Node mode) { // 以互斥模式建立一個node,waitStatus是0 Node node = new Node(Thread.currentThread(), mode); // 其實該段程式碼跟下面的enq方法差不多 Node pred = tail; // 如果該佇列已經有為節點 if (pred != null) { // 當前node上一個node指向為尾節點 node.prev = pred; // cas修改尾節點 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 死迴圈保證新增成功 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 如果當前佇列沒有節點,建立一個虛擬節點作為頭和尾,該節點的thread == null, waitStatus是0 // 通過cas操作保證只有一個執行緒修改成功 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
其實上述方法就是保證了並行情況下node一定能正確加入到node中,而且如果是空連結串列,會增加一個虛擬的head節點。
圖解一下:
(建立head虛擬節點)
將新節點指向tail
通過cas操作修改tail指向新節點
如果修改成功,將修改前的tail的next指向新節點
// 看這個方法的時候我建議不要關注其中的臨時變數 // 我們只要知道,這個方法裡面有個死迴圈,不管怎樣,只有return了,才能執行業務中定義的同步程式碼塊。 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; // 唯一的返回點 return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } // 我把程式碼改寫成下面這樣 可能會好理解一些 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); // 只有前繼節點是head的時候具有獲取資源的資格,如果獲取成功則直接將當前node設定成head if (p == head && tryAcquire(arg)) { // 可能在排隊後未阻塞前執行,也可能在阻塞被喚醒後執行 setHead(node); p.next = null; // help GC failed = false; // 唯一的返回點 return interrupted; } // 判斷是否該阻塞自己 if (!shouldParkAfterFailedAcquire(p, node)){ continue; } // 進行阻塞,並且在被喚醒之後返回執行緒的中斷狀態 if(!parkAndCheckInterrupt()) { continue; } interrupted = true; } } finally { // 先別關注。 if (failed) cancelAcquire(node); } }
總之,一旦該方法返回了,就意味著執行緒獲取資源成功了。
下面圖解獲取資源成功後,做的修改:
當然這裡的waitStatus不一定是0。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 如果上一節點狀態是-1,直接返回true if (ws == Node.SIGNAL) // 返回,並觸發外層阻塞執行緒 return true; if (ws > 0) { // 如果上一個節點狀態大於0,其實就是1表示被取消了 // 往前找waitStatus正常的節點 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // cas修改上一個節點為狀態為-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 返回進入下一個迴圈 return false; }
其實邏輯很簡單,就是修改自己的prev指向往前尋找的有效節點,並在阻塞前,將prev指向的有效節點waitStatus設定為-1。
問題來了?這個程式碼沒有並行問題嗎?
do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node;
答案是沒有並行問題,因為每個節點都是往前尋找,理論上講每個執行緒遍歷的節點不一樣。
我們來圖解一下這個迴圈修改的過程:
假設執行緒A(waitStatus = 0),執行緒B(waitStatus = 1),執行緒C(waitStatus = 0 為當前執行緒)
下圖示識了迴圈往前探測並修改參照關係
下圖就是修改了pred節點的waitStatus = -1 (下一次迴圈到來的時候)
此時中間執行緒B的節點其實沒有參照再指向他了。
如果成功修改執行緒A的waitStatus為-1之後,下一次迴圈到來,該方法就返回了true,執行緒就阻塞了。
其實看到這裡,我們可以將虛擬的head節點就當做是正在使用資源的執行緒表示(個人觀點哈)。
互斥資源申請的原始碼分析就結束了。
做一個小總結:
public final void acquire(int arg) { // 如果爭搶資源成功直接返回直接業務的同步程式碼塊 if (tryAcquire(arg)) { return; } // 迴圈+cas保證向佇列中新增當前執行緒的node成功 Node n = addWaiter(Node.EXCLUSIVE); // 死迴圈,爭搶資源,或者阻塞,或者喚醒之後繼續爭搶資源,直到搶資源成功後返回。 if(acquireQueued(n, arg)) { selfInterrupt(); } // 未被中斷的搶到資源 }
我們直接分析release()方法:
public final boolean release(int arg) { if (tryRelease(arg)) { // 一旦釋放資源,此處就需要考慮並行問題了 Node h = head; // head節點是虛擬節點,不可能是取消狀態所以這裡的判斷可以理解為 // 頭結點不為空而且頭結點的waitStatus = -1執行unparkSuccessor方法 if (h != null && h.waitStatus != 0) unparkSuccessor(h); // h == null 或者 h.waitStatus = 0說明沒有後繼節點需要喚醒, // 如果此時正好head後面一個node正在試圖修改head的狀態改成-1是能改成功的 // 但是由於之前分析的acquireQueued方法是一個死迴圈,哪怕head被修改成-1, // 但是由於該回圈會先搶鎖所以也就不存線上程改了狀態會park的問題。 // h == null說明並無執行緒參與競爭 return true; } return false; }
其實關鍵就是unparkSuccessor()方法
private void unparkSuccessor(Node node) { // 進入該方法說明資源已經被釋放了。 int ws = node.waitStatus; // 如果ws小於0,修改為0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 傳入node就是head // s指向head下一節點 Node s = node.next; // 如果沒有後繼節點說明有執行緒已經搶到資源 // 如果後繼節點被取消了 if (s == null || s.waitStatus > 0) { // 假設沒有節點需要喚醒 s = null; // 從後往前找,找到距離head最近的節點喚醒 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
其實執行該方法的時候是存在並行的情況的。
我理解這個地方從後往前找是為啥呢?能不能從前往後找?
這個問題,我也不知道。。。。