歡迎關注:王有志
期待你加入Java人的提桶跑路群:共同富裕的Java人
今天來和大家聊聊Condition
,Condition
為AQS「家族」提供了等待與喚醒的能力,使AQS"家族"具備了像synchronized
一樣暫停與喚醒執行緒的能力。我們先來看兩道關於Condition
的面試題目:
Condition
和Object
的等待與喚醒有什麼區別?Condition
佇列?接下來,我們就按照「是什麼」,「怎麼用」和「如何實現」的順序來揭開Condition
的面紗吧。
Condition
是Java中的介面,提供了與Object#wait
和Object#notify
相同的功能。Doug Lea在Condition
介面的描述中提到了這點:
Conditions (also known as condition queues or condition variables) provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true.
來看Condition
介面中提供了哪些方法:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
Condition
只提供了兩個功能:等待(await)和喚醒(signal),與Object
提供的等待與喚醒時相似的:
public final void wait() throws InterruptedException;
public final void wait(long timeoutMillis, int nanos) throws InterruptedException;
public final native void wait(long timeoutMillis) throws InterruptedException;
@HotSpotIntrinsicCandidate
public final native void notify();
@HotSpotIntrinsicCandidate
public final native void notifyAll();
喚醒功能上,Condition
與Object
的差異並不大:
Condition#signal
\(\approx\)Object#notify
Condition#signalAll
\(=\)Object#notifyAll
多個執行緒處於等待狀態時,Object#notify()
是「隨機」喚醒執行緒,而Condition#signal
則由具體實現決定如何喚醒執行緒,如:ConditionObject
喚醒的是最早進入等待的執行緒,但兩個方法均只喚醒一個執行緒。
等待功能上,Condition
與Object
的共同點是:都會釋放持有的資源,Condition
釋放鎖,Object
釋放Monitor,即進入等待狀態後允許其他執行緒獲取鎖/監視器。主要的差異體現在Condition
支援了更加豐富的場景,通過一張表格來對比下:
Condition 方法 |
Object 方法 |
解釋 |
---|---|---|
Condition#await() |
Object#wait() |
暫停執行緒,丟擲執行緒中斷異常 |
Condition#awaitUninterruptibly() |
/ | 暫停執行緒,不丟擲執行緒中斷異常 |
Condition#await(time, unit) |
Object#wait(timeoutMillis, nanos) |
暫停執行緒,直到被喚醒或等待指定時間後,超時後自動喚醒返回false,否則返回true |
Condition#awaitUntil(deadline) |
/ | 暫停執行緒,直到被喚醒或到達指定時間點,超時後自動喚醒返回false,否則返回true |
Condition#awaitNanos(nanosTimeout) |
/ | 暫停執行緒,直到被喚醒或等待指定時間後,返回值表示被喚醒時的剩餘時間(nanosTimeout-耗時),結果為負數表示超時 |
除了以上差異外,Condition
還支援建立多個等待佇列,即同一把鎖擁有多個等待佇列,執行緒在不同佇列中等待,而Object
只有一個等待佇列。《Java並行程式設計的藝術》中也有一張類似的表格,放在這裡供大家參考:
Tips:
Condition
的實現部分,下文通過AQS中的ConditionObject
詳細解釋。既然Condition
與Object
提供的等待與喚醒功能相同,那麼它們的用法是不是也很相似呢?
與呼叫Object#wait
和Object#notifyAll
必須處於synchronized
修飾的程式碼中一樣(獲取Monitor),呼叫Condition#await
和Condition#signalAll
的前提是要先獲取鎖。但不同的是,使用Condition
前,需要先通過鎖去建立Condition
。
以ReentrantLock
中提供的Condition
為例,首先是建立Condition
物件:
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
然後是獲取鎖並呼叫await
方法:
new Thread(() -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock.unlock();
}
最後,通過呼叫singalAll
喚醒全部阻塞中的執行緒:
new Thread(() -> {
lock.lock();
condition.signalAll();
lock.unlock();
}
作為介面Condition
非常慘,因為在Java中只有AQS中的內部類ConditionObject
實現了Condition
介面:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
}
static final class Node {
// 省略
}
}
ConditionObject
只有兩個Node
型別的欄位,分別是鏈式結構中的頭尾節點,ConditionObject
就是通過它們實現的等待佇列。那麼ConditionObject
的等待佇列起到了怎樣的作用呢?是類似於AQS中的排隊機制嗎?帶著這兩個問題,我們正是開始原始碼的分析。
Condition
介面中定義了4個執行緒等待的方法:
void await() throws InterruptedException
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
方法雖然很多,但它們之間的差異較小,只體現在時間的處理上,我們看其中最常用的方法:
public final void await() throws InterruptedException {
// 執行緒中斷,丟擲異常
if (Thread.interrupted()) {
throw new InterruptedException();
}
// 註釋1:加入到Condition的等待佇列中
Node node = addConditionWaiter();
// 註釋2:釋放持有鎖(呼叫AQS的release)
int savedState = fullyRelease(node);
int interruptMode = 0;
// 註釋3:判斷是否在AQS的等待佇列中
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 中斷時退出方法
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
// 加入到AQS的等待佇列中,呼叫AQS的acquireQueued方法
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
// 斷開與Condition佇列的聯絡
if (node.nextWaiter != null) {
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
}
註釋1的部分,呼叫addConditionWaiter
方法新增到Condition
佇列中:
private Node addConditionWaiter() {
// 判斷當前執行緒是否為持有鎖的執行緒
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 獲取Condition佇列的尾節點
Node t = lastWaiter;
// 斷開不再位於Condition佇列的節點
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 建立Node.CONDITION模式的Node節點
Node node = new Node(Node.CONDITION);
if (t == null) {
// 佇列為空的場景,將node設定為頭節點
firstWaiter = node;
} else {
// 佇列不為空的場景,將node新增到尾節點的後繼節點上
t.nextWaiter = node;
}
// 更新尾節點
lastWaiter = node;
return node;
}
可以看到,Condition
的佇列是一個樸實無華的雙向連結串列,每次呼叫addConditionWaiter
方法,都會加入到Condition
佇列的尾部。
註釋2的部分,釋放執行緒持有的鎖,同時移出AQS的佇列,內部呼叫了AQS的release
方法:
=final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState)) {
return savedState;
}
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
因為已經分析過AQS的release
方法和ReentrantLock
實現的tryRelease
方法,這裡我們就不過多贅述了。
註釋3的部分,isOnSyncQueue
判斷當前執行緒是否在AQS的等待佇列中,我們來看此時存在的情況:
isOnSyncQueue
返回false
,即執行緒不在AQS的佇列中,進入自旋,呼叫LockSupport#park
暫停執行緒;isOnSyncQueue
返回true
,即執行緒在AQS的佇列中,不進入自旋,執行後續邏輯。結合註釋1和註釋2的部分,Condition#await
的實現原理了就很清晰了:
Condition
與AQS分別維護了一個等待佇列,而且是互斥的,即同一個節點只會出現在一個佇列中;Condition#await
時,將執行緒新增到Condition
的佇列中(註釋1),同時從AQS佇列中移出(註釋2);Condition
佇列中,該執行緒需要被暫停,呼叫LockSupport#park
;基於以上的結論,我們已經能夠猜到喚醒方法Condition#signalAll
的原理了:
Condition
佇列中移出,並新增到AQS的佇列中;LockSupport.unpark
喚醒執行緒。至於這個猜想是否正確,我們接著來看喚醒方法的實現。
Tips:如果忘記了AQS中相關方法是如何實現的,可以回顧下《AQS的今生,構建出JUC的基礎》。
來看signal
和signalAll
的原始碼:
// 喚醒一個處於等待中的執行緒
public final void signal() {
if (!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 獲取Condition佇列中的第一個節點
Node first = firstWaiter;
if (first != null) {
// 喚醒第一個節點
doSignal(first);
}
}
// 喚醒全部處於等待中的執行緒
public final void signalAll() {
if (!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if (first != null) {
// 喚醒所有節點
doSignalAll(first);
}
}
兩個方法唯一的差別在於頭節點不為空的場景下,是呼叫doSignal
喚醒一個執行緒還是呼叫doSignalAll
喚醒所有執行緒:
private void doSignal(Node first) {
do {
// 更新頭節點
if ( (firstWaiter = first.nextWaiter) == null) {
// 無後繼節點的場景
lastWaiter = null;
}
// 斷開節點的連線
first.nextWaiter = null;
// 喚醒頭節點
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
// 將Condition的佇列置為空
lastWaiter = firstWaiter = null;
do {
// 斷開連結
Node next = first.nextWaiter;
first.nextWaiter = null;
// 喚醒當前頭節點
transferForSignal(first);
// 更新頭節點
first = next;
} while (first != null);
}
可以看到,無論是doSignal
還是doSignalAll
都只是將節點移出Condition
佇列,而真正起到喚醒作用的是transferForSignal
方法,從方法名可以看到該方法是通過「轉移」進行喚醒的,我們來看原始碼:
final boolean transferForSignal(Node node) {
// 通過CAS替換node的狀態
// 如果替換失敗,說明node不處於Node.CONDITION狀態,不需要喚醒
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
return false;
}
// 將節點新增到AQS的佇列的隊尾
// 並返回老隊尾節點,即node的前驅節點
Node p = enq(node);
int ws = p.waitStatus;
// 對前驅節點狀態的判斷
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) {
LockSupport.unpark(node.thread);
}
return true;
}
transferForSignal
方法中,呼叫enq
方法將node
重新新增到AQS的佇列中,並返回node
的前驅節點,隨後對前驅節點的狀態進行判斷:
Node.CANCELLED
狀態,前驅節點退出鎖的爭搶,node
可以直接被喚醒;Node.SIGNAL
,設定失敗時,直接喚醒node
。《AQS的今生,構建出JUC的基礎》中介紹了waitStatus
的5種狀態,其中Node.SIGNAL
狀態表示需要喚醒後繼節點。另外,在分析shouldParkAfterFailedAcquire
方法的原始碼時,我們知道在進入AQS的等待佇列時,需要將前驅節點的狀態更新為Node.SIGNAL
。
最後來看enq
的實現:
private Node enq(Node node) {
for (;;) {
// 獲取尾節點
Node oldTail = tail;
if (oldTail != null) {
// 更新當前節點的前驅節點
node.setPrevRelaxed(oldTail);
// 更新尾節點
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
// 返回當前節點的前驅節點(即老尾節點)
return oldTail;
}
} else {
initializeSyncQueue();
}
}
}
enq
的實現就非常簡單了,通過CAS更新AQS的佇列尾節點,相當於新增到AQS的佇列中,並返回尾節點的前驅節點。好了,喚醒方法的原始碼到這裡就結束了,是不是和我們當初的猜想一模一樣呢?
功能上,Condition
實現了AQS版Object#wait
和Object#notify
,用法上也與之相似,需要先獲取鎖,即需要在lock
與unlock
之間呼叫。原理上,簡單來說就是執行緒在AQS的佇列和Condition
的佇列之間的轉移。
假設有執行緒t已經獲取了ReentrantLock
,執行緒t1,t2和t3正在AQS的佇列中等待,我們可以得到這樣的結構:
如果執行緒t中呼叫了Condition#await
方法,執行緒t進入Condition
的等待佇列中,執行緒t1獲取ReentrantLock
,並從AQS的佇列中移出,結構如下:
如果執行緒t1中也執行了Condition#await
方法,同樣執行緒t1進入Condition
佇列中,執行緒t2獲取到ReentrantLock
,結構如下:
如果執行緒t2執行了Condition#signal
,喚醒Condition
佇列中的第一個執行緒,此時結構如下:
通過上面的流程,我們就可以得到執行緒是如何在Condition
佇列與AQS佇列中轉移的:
關於Condition
的內容到這裡就結束了,無論是理解,使用還是剖析原理,Condition
的難度並不高,只不過大家可能平時用得比較少,因此多少有些陌生。
最後,截止到文章釋出,我應該是把開頭兩道題目的題解寫完了吧~~
好了,今天就到這裡了,Bye~~