摘要:當你使用java實現一個執行緒同步的物件時,一定會包含一個問題:你該如何保證多個執行緒存取該物件時,正確地進行阻塞等待,正確地被喚醒?
本文分享自華為雲社群《JUC中的AQS底層詳細超詳解,剖析AQS設計中所需要考慮的各種問題!》,作者: breakDawn 。
當你使用java實現一個執行緒同步的物件時,一定會包含一個問題:
你該如何保證多個執行緒存取該物件時,正確地進行阻塞等待,正確地被喚醒?
關於這個問題,java的設計者認為應該是一套通用的機制
因此將一套執行緒阻塞等待以及被喚醒時鎖分配的機制稱之為AQS
全稱 AbstractQuenedSynchronizer
中文名即抽象的佇列式同步器 。
基於AQS,實現了例如ReentenLock之類的經典JUC類。
AQS中的資源是一個int值,而且是volatile的,並提供了3個方法給子類使用:
private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } // cas方法 compareAndSetState(int oldState, int newState);
如果state上限只有1,那麼就是獨佔模式Exclusive,例如 ReentrantLock
如果state上限大於1,那就是共用模式Share,例如 Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier
對外暴露的getter/setter方法,是走不了CAS的。而且setter/getter沒有被synchronized修飾。所以必須要volatile,保證可見性
這樣基於AQS的實現可以直接通過getter/setter操作state變數,並且保證可見性,也避免重排序帶來的影響。比如CountDownLatch,ReentrantReadWriteLock,Semaphore都有體現(各種getState、setState)
volatile的state成員有一個問題,就是如果是複合操作的話不能保證複合操作的原子性
因此涉及 state增減的情況,採用CAS
如果是state設定成某個固定值,則使用setState
這個佇列的目的是為了公平鎖的實現
即為了保證先到先得,要求每個執行緒封裝後的Node按順序拼接起來。
不是的,本質上是一個連結串列式的佇列
因此核心在於連結串列節點Node的定義
除了比較容易想到的prev和next指標外
還包含了該節點內的執行緒
以及 waitStatus 等待狀態
4種等待狀態如下:
入隊過程可能引發衝突
因此會用CAS保障入隊安全。
private Node enq(final Node node) { //多次嘗試,直到成功為止 for (;;) { Node t = tail; //tail不存在,設定為首節點 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { //設定為尾節點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
一旦有節點出隊,說明有執行緒釋放資源了,隊頭的等待執行緒可以開始嘗試獲取了。
於是首節點的執行緒釋放同步狀態後,將會喚醒它的後繼節點(next)
而後繼節點將會在獲取同步狀態成功時將自己設定為首節點
注意在這個過程是不需要使用CAS來保證的,因為只有一個執行緒能夠成功獲取到同步狀態
AQS使用的設計模式是模板方法模式。
具體程式碼如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 發現中斷過,則觸發中斷異常 selfInterrupt(); }
即AQS抽象基礎類別AbstractQueuedSynchronizer給外部呼叫時,都是調的acquire(int arg)方法。這個方法的內容是寫死的。
而acquire中,需要呼叫tryAcquire(arg), 這個方法是需要子類實現的,作用是判斷資源是否足夠獲取arg個
(下面部分程式碼註釋選自: (2條訊息) AQS子類的tryAcquire和tryRelease的實現_Mutou_ren的部落格-CSDN部落格_aqs tryacquire )
這裡暫時只談論一種容易理解的tryAcuire實現,其他附加特性的tryAcquire先不提。
裡面主要就做這幾件事:
protected final boolean tryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); // state==0代表當前沒有鎖,可以進行獲取 if (c == 0) { // 非公平才有的判斷,會判斷是否還有前驅節點,直接自己為頭節點了或者同步佇列空了才會繼續後面的鎖的獲取操作 if (!hasQueuedPredecessors() //CAS設定state為acquires,成功後標記exclusiveOwnerThread為當前執行緒 && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 當前佔有執行緒等於自己,代表重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 出現負數,說明溢位了 if (nextc < 0) // throw new Error("Maximum lock count exceeded"); // 因為是重入操作,可以直接進行state的增加,所以不需要CAS setState(nextc); return true; } return false; }
當獲取資源失敗,會進行addWaiter(Node.EXCLUSIVE), arg)。
目的是建立一個等待節點Node,並新增到等待佇列
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // 通過CAS競爭隊尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 競爭隊尾失敗,於是進行CAS頻繁迴圈競爭隊尾 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
並在 "處於頭節點時嘗試獲取資源->睡眠->喚醒「中迴圈。
當已經跑完任務的執行緒釋放資源時,會喚醒之前阻塞的執行緒。
當被喚醒後,就會檢查自己是不是頭節點,如果不是,且認為可以阻塞,那就繼續睡覺去了
(下面程式碼註釋部分選自AQS(acquireQueued(Node, int) 3)–佇列同步器 - 小窩蝸 - 部落格園 (http://cnblogs.com) )
final boolean acquireQueued(final Node node, int arg) { // 標識是否獲取資源失敗 boolean failed = true; try { // 標識當前執行緒是否被中斷過 boolean interrupted = false; // 自旋操作 for (;;) { // 獲取當前節點的前繼節點 final Node p = node.predecessor(); // 如果前繼節點為頭結點,說明排隊馬上排到自己了,可以嘗試獲取資源,若獲取資源成功,則執行下述操作 if (p == head && tryAcquire(arg)) { // 將當前節點設定為頭結點 setHead(node); // 說明前繼節點已經釋放掉資源了,將其next置空,好讓虛擬機器器提前回收掉前繼節點 p.next = null; // help GC // 獲取資源成功,修改標記位 failed = false; // 返回中斷標記 return interrupted; } // 若前繼節點不是頭結點,或者獲取資源失敗, // 則需要判斷是否需要阻塞該節點持有的執行緒 // 若可以阻塞,則繼續執行parkAndCheckInterrupt()函數, // 將該執行緒阻塞直至被喚醒 // 喚醒後會檢查是否已經被中斷,若返回true,則將interrupted標誌置於true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 最終獲取資源失敗,則當前節點放棄獲取資源 if (failed) cancelAcquire(node); } }
該方法不會直接阻塞執行緒,因為一旦執行緒掛起,後續就只能通過喚醒機制,中間還發生了核心態使用者態切換,消耗很大。
因此會先不斷確認前繼節點的實際狀態,在只能阻塞的情況下才會去阻塞。
並且會過濾掉cancel的執行緒節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 獲取前繼節點的等待狀態 int ws = pred.waitStatus; // 如果等待狀態為Node.SIGNAL(-1),則直接返回true即可以阻塞 // 因為這說明前繼節點完成資源的釋放或者中斷後,會主動喚醒後繼節點的(這也即是signal訊號的含義),因此方法外面不用再反覆CAS了,直接阻塞吧 if (ws == Node.SIGNAL) return true; // 如果前繼節點的等待值大於0即CANCELLED(1),說明前繼節點的執行緒發生過cancel動作 // 那就繼續往前遍歷,直到當前節點的前繼節點的狀態不為cancel if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 前繼節點的等待狀態不為SIGNAL(-1),也不為Cancel(1) // 那麼只能是PROPAGATE(-3)或者CONDITION(-2)或者INITIAL(0) // 直接設定成SIGNAL,下一次還沒CAS成功,就直接睡覺了 // 因此在前面所有節點沒辯護的情況下, 最多一次之後就會返回true讓外面阻塞 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
使用LockSupport.park來阻塞當前這個物件所在的執行緒
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 確認是否是中斷導致的park結束,並清除中斷標記 return Thread.interrupted(); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }
如果還要深挖底層實現原理,可以詳細見該連結
簡而言之,是用mutex和condition保護了一個_counter的變數,當park時,這個變數置為了0,當unpark時,這個變數置為1。
底層用的C語言的pthread_mutex_unlock、pthread_cond_wait 、pthread_cond_signal ,但是針對了mutex和_cond兩個變數進行加鎖。
對執行緒呼叫 t1.interrupt();時
會導致 LockSupport.park() 阻塞的執行緒重新被喚醒
即有兩種喚醒情況: 被前置節點喚醒,或者被外部中斷喚醒
這時候要根據呼叫的acuire型別決定是否在中斷髮生時結束鎖的獲取。
上面介紹的是不可中斷鎖。
在parkAndCheckInterrupt中,當park結束阻塞時時,使用的是 Thread.interrupted() 而不是 .isInterrupted() 來返回中斷狀態
因為前者會返回執行緒當前的中斷標記狀態同時清除中斷標誌位(置為false)
外層CAS迴圈時, 就不會讓執行緒受中斷標記影響,只是記錄一下是否發生過中斷
當獲取鎖成功後,如果發現有過執行緒中斷,則會觸發中斷異常,
之後便由獲取鎖的呼叫者自己決定是否要處理執行緒中斷。像下面這樣:
reentrantLock.lock(); try { System.out.println("t1"); TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } finally { reentrantLock.unlock(); }
那麼另一種情況就是可中斷鎖了。
ReentranLock有一個lockInterruptibly()方法就是這種情況
執行緒被喚醒時,如果發現自己被中斷過,就會直接拋異常而不是繼續獲取鎖
因此如果你的執行緒對中斷很敏感,那麼就是用可中斷鎖,及時響應。
如果不敏感,也要注意處理中斷異常。
首先AQS提供的模板方法為release方法。
核心邏輯就是對資源進行嘗試性釋放
如果成功,就喚醒等待佇列中的第一個頭節點
public final boolean release(int arg) { // 是否釋放成功,tryRelease是子類要實現的方法 if (tryRelease(arg)) { Node h = head; // 判斷頭節點是否正在阻塞中,是的話喚醒 if (h != null && h.waitStatus != 0) // 喚醒頭節點 unparkSuccessor(h); return true; } return false; }
看一下ReteenLock中的tryRelease實現
就是減一下資源值。
當資源值清零,則說明可以解除了對當前點的佔用
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; // 設定當前佔用執行緒為null setExclusiveOwnerThread(null); } // 不需要CAS,因為只有持有鎖的人才能做釋放,不擔心競爭 setState(c); return free; }
以ReteenLock為例,它內部tryAcquire有兩種同步器的實現
公平同步器和非公平同步器都是ReentrantLock中定義的一個static內部類
ReentrantLock根據設定的不同,使用這2個同步器做資源的獲取和同步操作
他們二者的提供的lock操作,本質上就是AQS的acquire(1)
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); }
二者在公平和非公平的實現區別上,就是喚醒執行緒後,只有等待佇列的隊頭節點才會嘗試競爭。
而非公平鎖是隻要喚醒了就可以嘗試競爭。
因此核心區別在於hasQueuedPredecessors方法!
非公平鎖可能引發「飢餓」,即一個執行緒反覆搶佔獲取,而其他執行緒一直拿不到。
而公平鎖不存在飢餓,只要排上隊了就一定能拿到
非公平鎖的平均效能比公平鎖要高, 因為非公平鎖中所有人都可以CAS搶佔,如果同步塊的時間非常短,那麼可能所有人都不需要阻塞,減少CPU喚醒執行緒的開銷,整體的吞吐效率會高點,CPU也不必取喚醒所有執行緒,會減少喚起執行緒的數量。
效能測試中公平鎖的耗時是非公平鎖的94.3倍, 總切換次數是133倍
預設是非公平的,原因就是上文考慮的效能差距過大問題, 因此公平鎖只能用於特定對效能要求不高且飢餓發生概率不大的場景中。
先實現了一個靜態內部類Sync
和上面的RLock類一個區別在於需要state初始化值,不一定為1
Sync(int permits) { setState(permits); }
再繼承實現了FairSync和NoFairSync
使用CAS實現值的增加或者減少
公平/非公平的區別同樣是hasQueuedPredecessors的判斷
protected int tryAcquireShared(int acquires) { for (;;) { // 隊頭判斷,公平鎖核心 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; // 號誌不足,直接返回負數 if (remaining < 0 || // 能搶成功,返回修改後的值,搶失敗則for迴圈繼續 compareAndSetState(available, remaining)) return remaining; } }
通過current == getExclusiveOwnerThread()來判斷並進行非CAS的setState操作
if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 出現負數,說明溢位了 if (nextc < 0) // throw new Error("Maximum lock count exceeded"); // 因為是重入操作,可以直接進行state的增加,所以不需要CAS setState(nextc); return true; }
注意處理重入問題時,如果是獨佔鎖,是可以直接setState而不需要CAS的,因為不會競爭式地重入!
ReentrantLock釋放時,也會處理重入,關鍵點就是對getState() - release後的處理,是否返回true或者false
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 只有資源數為0才會解鎖 // 才算釋放成功,否則這鎖還是佔住了 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
AQS提供的方法中帶有Nanos字尾的方法就是支援超時中斷的方法。
核心邏輯就是每次阻塞前,確認nanosTimeout是否已經超時了。
每次喚醒時,將nanosTimeout減去阻塞所花的時間,重新確認,並修改lastTime
關鍵部分見下圖
首先這個值是寫死的1000L即1000納秒
1000納秒是個非常小的數位,而小於等於1000納秒的超時等待,無法做到十分的精確,那麼就不要使用這麼短的一個超時時間去影響超時計算的精確性,所以這時執行緒不做超時等待,直接做自旋就好了。