任何一個java物件都擁有一組定義在Object中的監視器方法——wait(),wait(long timeout),notify(),和notifyAll()方法,這些方法配合sync hronized同步關鍵字,可以實現等待/通知模式。Condition介面也提供了類似於Object的監視器方法,可以和Lock介面的實現配合實現等待/通知模式
Object監視器 | Condition | |
---|---|---|
前置條件 | 獲取物件的鎖 | Lock.lock()獲取,並且通過當前Lock.newCondtion()獲取Condition物件 |
等待佇列 | 一個 | 多個,一個lock 可以呼叫多次newCondtion()生成多個等待佇列 |
呼叫等待方法使當前執行緒放棄鎖進入等待佇列 | 是 | 是 |
等待時可以不響應中斷 | 不支援 | 支援 |
超時等待 | 支援 | 支援 |
等待到指定的絕對時間 | 不支援 | 支援 |
喚醒等待佇列中一個執行緒 | 支援 | 支援 |
喚醒所有等待佇列中的執行緒 | 支援 | 支援 |
方法名稱 | 方法描述 |
---|---|
void await() throws InterruptedException | 當前執行緒進入等待狀態直到被通知or中斷,當前執行緒結束await()時必然獲取到了鎖,那怕是在等待的時候被中斷也必須獲取鎖後響應中斷才可以返回 |
void awaitUninterruptibly() | 對中斷不敏感的等待,即使被中斷,該方法返回之前也只是補上中斷標誌位,同樣返回的時候必須獲取到鎖 |
long awaitNanos(long nanosTimeout) throws InterruptedException | 超時等待nanosTimeout 納秒,返回值是剩餘的時間,如果耗時h納秒被喚醒 那麼返回nanosTimeout-n,返回0 or 負數表示是超時退出,對中斷敏感 |
boolean await(long time, TimeUnit unit) throws InterruptedException | 使當前執行緒一直等待,直到它發出訊號或中斷,或者指定的等待時間過去。此方法在行為上等價於: awaitNanos(unit.toNanos(time)) > 0,支援指定時間單位 |
boolean awaitUntil(Date deadline) throws InterruptedException | 當前執行緒進入等待直到被喚醒,or中斷or到了指定的截至日期,在指定截至日期前被通知返回true,反之false |
void signal() | 喚醒一個等待執行緒。如果有任何執行緒在此條件下等待,則選擇一個用於喚醒。然後,該執行緒必須在從等待返回之前重新獲取鎖,呼叫此方法,通常要求當前執行緒持有與此條件關聯的鎖 |
void signalAll() | 喚醒所有等待的執行緒。如果有任何執行緒正在等待這種情況,那麼它們都會被喚醒。每個執行緒必須重新獲取鎖才能從等待返回。 當呼叫此方法時,通常要求當前執行緒持有與此條件關聯的鎖。 |
Condition的實現一般都是AQS中的內部類ConditionObject,下面Condition都是指AQS中的ConditionObject
await方法會當前獲取鎖的執行緒從同步佇列移動到等待佇列,並且完全釋放到鎖,掛起當前執行緒直到被signal或者被中斷,並且必須獲取到鎖才可以返回,如果在等待的過程中被中斷還會根據中斷在signal之前或者signal選擇是補上中斷標識還是丟擲中斷異常
中斷對此方法十分關鍵,也是最難理解的部分,後續會解釋doug lea是如何實現
public final void await() throws InterruptedException {
//如果進入方法就已經被中斷那麼復位中斷標識並且丟擲中斷異常
if (Thread.interrupted())
throw new InterruptedException();
//把當前執行緒假如到等待佇列
Node node = addConditionWaiter();
//完全釋放掉鎖
int savedState = fullyRelease(node);
//中斷標識 0 表示在等待的過程中從沒有被中斷過,1表示丟擲中斷異常,2表示重新中斷「偽造」一箇中斷表示
int interruptMode = 0;
//如果不在同步佇列 那麼一致掛起,直到被signal喚醒後移動到同步佇列 或者被中斷
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//中斷可以讓執行緒從park中返回
//檢查是否因為中斷而從park中返回,如果是由於中斷那麼還要判斷中斷在sigal之前還是之後,只要是中斷都會從打斷迴圈
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//acquireQueued是嘗試獲取鎖,返回是否中斷
//如果獲取鎖的時候被中斷 且 中斷髮生在signal之後或者 上面等待的過程沒有被中斷過
//那麼中斷表示置為 重新中斷
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理放棄等待了的節點
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//如果發生了中斷
if (interruptMode != 0)
//選擇是丟擲中斷異常 還是重新中斷自己
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
和同步佇列不同的是,沒有啞節點作為頭節點的操作,也不需要自旋,因為當前節點必然是獲取到鎖的節點(這裡的鎖指的是獨佔鎖)並且節點的狀態初始的時候預設是CONDITION,並且注意等待佇列中的節點是使用nextWaiter屬性串聯起來的,是一個單向連結串列,當前節點是獨佔模式中的頭節點,無需設定pre next指標,釋放鎖後自然會有其他節點獲取到鎖設定自己為頭節點
final int fullyRelease(Node node) {
boolean failed = true;
try {
//對於ReentrantLock state 表示的是重入了幾次,
int savedState = getState();
//完全釋放
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
這裡把原來的state返回了方便後續,等待結束後再次佔用恢復到等待錢的狀態
目前節點已經進入到了等待佇列,來到同步佇列存在兩個可能——其他執行緒Signal到當前節點,or當前執行緒被中斷後自己入隊,這部分原始碼最好結合喚醒操作原始碼一起看
final boolean isOnSyncQueue(Node node) {
//這裡node.waitStatus == Node.CONDITION 成立 說明還沒被singal,喚醒呼叫transferForSignal第一行程式碼就是cas狀態從CONDITION到0
//需要判斷node.prev==null(前面的成立那就短路了) 說明transferForSignal第一行程式碼CAS改為0結束 但是沒還沒來得及執行後續的enq,也就是沒來得及接到同步佇列後
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果後面有節點 那麼一定是在同步佇列的
if (node.next != null)
return true;
//說明上面第一個if沒有成立 說明執行的時候來到同步佇列,
//第二個if沒有成立說明 執行的時候沒有沒有節點
//說明剛來到共用佇列
//可能出現尾分叉 pre指向尾部,但是自己還沒能CAS為尾,這種情況進入這個方法的一瞬間如果可以成功CAS為尾巴,或者在之前CAS為尾巴, 那麼視為已經在同步佇列,反之視為不在同步佇列
return findNodeFromTail(node);
}
node.waitStatus == Node.CONDITION
假如到等待佇列的時候節點初始狀態為CONDITION,CONDITION表示當前節點處於等待狀態,喚醒當前節點的第一件事就是讓CAS設定當前節點狀態從CONDITION為0,等待執行緒被中斷後檢查中斷也是先CAS設定當前節點狀態從CONDITION為0,所以如果節點狀態為CONDITION那麼一定是在等待佇列
node.prev == null
喚醒當前節點後續會讓當前節點進入同步佇列(enq
方法)enq首先設定自己的prev為尾巴,並且在等待佇列是nextWaiter串聯起來的不存在前置指標,所以如果前置指標指向的是null那麼一定是在等待佇列
node.next != null
等待佇列是nextWaiter串聯起來的不存在next指標,那麼如果next指向的不是null,說明一定在同步佇列,並且這是進入同步成功且後續有其他節點排在當前節點後面
findNodeFromTail
從尾部開始查詢當前節點
這裡需要複習下enq
完整入隊的做法,先設定自己的pre指向尾巴,然後cas設定自己為尾巴,後改變前置節點的next,一瞬間只可能存在一個執行緒CAS設定自己為尾成功,就出現了尾部分叉的情況,從尾部開始搜尋主要原因是——效率,節點都是從尾部入隊的,從尾部開始搜尋肯定是大於從頭部開始搜尋的
上面我們看到,如果沒有其他執行緒喚醒,當前執行緒無法移動到同步佇列,isOnSyncQueue
一定為真,為真就會被park節省CPU資源。退出while迴圈的另外一種方法就是當前執行緒被中斷。但是
await
是對中斷敏感的,喚醒前中斷的話當前執行緒在獲取到鎖後需要丟擲中斷異常, checkInterruptWhileWaiting
的返回值有三種
private int checkInterruptWhileWaiting(Node node) {
//如果發生了中斷,這種是await中被掛起的是被中斷打斷
return Thread.interrupted() ?
//檢查中斷在signal 之前還是signal 之後 如果之前 那麼後續丟擲異常 反之只需要補上中斷,後續補上的中斷是在當前執行緒拿到鎖後的中斷
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
這裡可以看到Thread.interrupted()
重置中斷標誌,並且返回是否中斷,如果是false 那麼直接返回0,表示沒有被中斷。反之需要呼叫transferAfterCancelledWait
進行進一步的判斷
final boolean transferAfterCancelledWait(Node node) {
//如果這裡設定成功 那麼後續存線上程signal當前節點 那麼會失敗
//這裡設定成功 說明我們由於中斷被喚醒,且這個喚醒在singal之前
//說明我們成功放棄其他執行緒的signal
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
//入隊
enq(node);
//返回true 代表中斷在signal 之前
return true;
}
//反之如果上面CAS失敗,說明中斷髮生在Signal之後 那麼當前執行緒讓步 等待signal的執行緒把我們移動到同步佇列
while (!isOnSyncQueue(node))
Thread.yield();
//中斷在signal 之後返回false
return false;
}
之前我們說過,喚醒方法的第一步就是CAS設定等待狀態從CONDITION到0,所以
compareAndSetWaitStatus(node, Node.CONDITION, 0)
成功,說明當前節點沒有被其他執行緒喚醒,那麼當前執行緒自己呼叫enq
入隊,返回true(true表示放棄等待,在checkInterruptWhileWaiting中的作用就是讓checkInterruptWhileWaiting返回THROW_IE,因為這意味著中斷在喚醒之前)compareAndSetWaitStatus(node, Node.CONDITION, 0)
失敗,說明當前節點已經被喚醒,喚醒發出的執行緒會把當前節點移動到同步佇列的,所以後續只需要判斷自己是否在同步佇列,如果不在那麼就讓出CPU資源,直到喚醒執行緒移動當前節點到同步佇列,返回false(在checkInterruptWhileWaiting中的作用就是讓checkInterruptWhileWaiting返回REINTERRUPT,因為這意味著中斷在喚醒之後)//所以只要當前執行緒在等待的途中被中斷,都會讓它結束掉 這個while迴圈
//只是被中斷在喚醒之前的時候自己入隊,中斷在喚醒之後的話讓喚醒發出的執行緒移動自己到同步佇列
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
acquireQueued
在AQS獨佔模式已經講解了,該方法返回一個布林值,true表示獲取的的途中被中斷了,false則反之
//如果獲取的途中被中斷了 且 interruptMode != THROW_IE 前面我們說過THROW_IE表示中斷在喚醒之前,所以說這裡兩種情況下會設定interruptMode=REINTERRUPT
//1 等待的時候沒有被中斷 interruptMode之前一直為0
//2 等待的時候被中斷 但是中斷在 喚醒之後 interruptMode之前一直為REINTERRUPT
//也就是說,interruptMode = REINTERRUPT有兩種情況中斷在 喚醒之後和 從等待佇列移動到同步佇列獲取鎖的途中被中斷
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
還需要注意的是acquireQueued是在獲取到鎖之後才會返回,沒有獲取鎖會一直自旋掛起的
常規的連結串列操作,這裡不需要保證執行緒安全,因為當前節點執行此方法的時候必然已經獲取到了獨佔鎖
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
//0 表示從始至終沒被中斷過,即等待喚醒沒被中斷,獲取鎖也沒有被中斷,這種情況下不需要響應中斷
if (interruptMode != 0)
//處理中斷
reportInterruptAfterWait(interruptMode);
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
//THROW_IE——丟擲中斷異常
if (interruptMode == THROW_IE)
throw new InterruptedException();
//REINTERRUPT——自我中斷補上中斷標識
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
也就是說
當前執行緒拿到鎖之後
丟擲中斷異常當前執行緒拿到鎖之後
自我中斷(避免影響到執行緒中根據中斷標識進行操作的程式碼邏輯)public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//獲取中斷標識,並且重置中斷標識
if (Thread.interrupted())
//等待的時候被中斷過
interrupted = true;
}
//獲取鎖 or 等待的時候被中斷 都只是自我中斷補上中斷標誌
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
程式碼和await
套路差不多,區別在於awaitUninterruptibly
無論是等待中被中斷(無論是在喚醒前還是喚醒後),亦或者是在獲取鎖的過程中被中斷,都只是補上中斷標誌位
超時等待程式碼邏輯都差不多,我們挑取一個看一看
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//超時了
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//剩餘時間 大於閾值才進行掛起,
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
和await()
一樣的套路,只是在掛起上加了一層判斷——只有大於閾值的時候才掛起,以及如果等待超時了會呼叫transferAfterCancelledWait
方法,這個方法上面說過:嘗試自己回到同步佇列,但是如果進入方法的一瞬間被喚醒了,會讓當前執行緒讓步,等待喚醒執行緒將當前執行緒移動到同步佇列
public final void signalAll() {
//要求喚醒的執行緒必須獨佔鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
程式碼並不複雜,就是迴圈呼叫transferForSignal
喚醒所有等待佇列中的節點,transferForSignal
返回true 表示成功喚醒,返回false表示喚醒之前當前節點已經放棄
final boolean transferForSignal(Node node) {
//首先CAS設定狀態從Condition到0
//這個和放棄等待具有一個競爭的關係,可以判斷是放棄在前 還是喚醒在前
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//到這說明沒有取消等待 那麼讓當前節點進入到同步佇列
Node p = enq(node);
//enq返回的前置節點 這裡是前置節點的狀態
int ws = p.waitStatus;
//前置節點取消了,或者CAS修改前置節點SINGNAL失敗,那麼喚醒當前執行緒讓他自己去入隊,
//這裡有個小細節,如果前置節點在當前節點入隊之前就放棄了,那麼會cancelAquire方法沒辦法喚醒到當前節點的執行緒
//如果前置節點在CAS操作的時候放棄了,喚醒當前執行緒,前置執行緒的放棄和當前執行緒自己主動入隊,雙執行緒一起操作,提高效率
//如果前置節點一直沒有放棄,cas成功了,那麼當前執行緒等著就好
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
public final void signal() {
//同樣要求獨佔
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//transferForSignal返回true 表示成功喚醒,返回false表示喚醒之前當前節點已經放棄
//所有這裡while表示必須成功喚醒一個直到佇列無節點喚醒
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
下面我們研究下,doug lea是如何判斷中斷在喚醒前還是喚醒後的
final boolean transferAfterCancelledWait(Node node) {
//1
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
//3
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
final boolean transferForSignal(Node node) {
//2
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
假如同一瞬間,執行緒B在等待的時候被中斷,執行緒A且在此時企圖喚醒B,這時候B執行到1這一行,A執行到2這一行,它們兩個執行緒都嘗試把B執行緒代表的節點狀態從CONDITION修改位0,這兩個CAS操作必然有一個前有一個後
迴圈柵欄(也稱做屏障),柵欄的作用是攔住一系列執行緒,迴圈意味著,這個柵欄可以迴圈使用
下文中的柵欄和屏障都是指的CyclicBarrier
一種同步輔助工具,它允許一組執行緒相互等待以達到共同的障礙點。 CyclicBarriers 在涉及固定大小的執行緒組的程式中很有用,這些執行緒組必須偶爾相互等待。屏障被稱為迴圈的,因為它可以在等待執行緒被釋放後重新使用。 CyclicBarrier 支援一個可選的 Runnable 命令,該命令在每個屏障點執行一次,且在在隊伍中的最後一個執行緒到達之後,但在任何執行緒被釋放之前之前。
CountDownLatch只可以使用一次,但是CyclicBarrier 可以使用多次(使用reset()
方法重置)CountDownLatch是一組執行緒等待另外一組執行緒釋放共用鎖,CyclicBarrier 是一組執行緒中每一個執行緒都等待其他執行緒一起執行到一個點
CountDownLatch 可以理解moba遊戲開始遊戲的確認,需要等待五個人都同意後才可以進入選英雄介面(選擇英雄的執行緒等待未確認人數到達0後執行)
CyclicBarrier 是等待五個人都選完英雄點選確定後,五個人才能進入讀條環節,五個人都讀條完才能一起進入遊戲(五個人中任何一個人都要等其他人選完英雄後才能一起進入讀條,五個人中每一個人都要等其他人讀條完才能一起進入峽谷)
方法 | 描述 |
---|---|
public CyclicBarrier(int parties, Runnable barrierAction) | 表示一共有多少個執行緒(parties個)一起到達後一起執行下一個操作(barrierAction)barrierAction由最後到達的執行緒執行 |
public CyclicBarrier(int parties) | 表示一共有多少個執行緒(parties個)一起到達後才能繼續執行 |
public int getParties() | 返回觸發此障礙所需的參與方數量,派對需要多少人才能開始 |
public int await() | 當前執行緒等待所有執行緒都在屏障上呼叫此方法,如果當前執行緒不是最後一個到達的執行緒那麼會處於掛起狀態,除非出現最後一個執行緒到達,或者其他執行緒中斷掛起的執行緒,或者其他執行緒使用的是超時await方法並且超時,或者其他執行緒呼叫屏障的重置方法 |
int await(long timeout, TimeUnit unit) | 比上面的await多一個當前執行緒超時的機制 |
public boolean isBroken() | 查詢此屏障是否處於損壞狀態,執行緒的中斷,超時,出現異常都將導致屏障破碎 |
public void reset() | 將屏障重置為其初始狀態。如果任何執行緒當前在屏障處等待那麼將返回一個 BrokenBarrierException。 |
public int getNumberWaiting() | 等待的執行緒的數量 |
/*保證CyclicBarrier一些執行緒安全的鎖 */
private final ReentrantLock lock = new ReentrantLock();
/** 呼叫await方法執行緒將在此condition上面等待*/
private final Condition trip = lock.newCondition();
/** 這個「派對」需要多少執行緒都到達 */
private final int parties;
/* 執行緒都到達之後執行此方法 */
private final Runnable barrierCommand;
/** 代,每次 CyclicBarrier 構造或者重置的時候都會建立新的一代,其中只有一個屬性記錄柵欄是否破碎*/
private Generation generation = new Generation();
/**
* 還需要等待多少個執行緒到達
*/
private int count;
其中比較難以理解的就是Generation
好比一群好朋友一起去看電影,一開始我們約定好5一起去看(parties=5)後續陸續來了3人 (count=5-3)但是突然有人等待的途中被她的媽媽打電話叫回去了(被打斷了)導致丟擲中斷異常並且使柵欄破碎,或者有人性急進行超時等待,還沒有來就呼籲朋友們都別等了,已經到達的人決定不在等待一起去看電影
什麼叫Generation:
一個過山車有10個座位,景區常常需要等夠10個人了,才會去開動過山車。於是我們常常在欄杆(barrier)外面等,等湊夠了10個人,工作人員就把欄杆開啟,讓10個人通過;然後再將欄杆歸位,後面新來的人還是要在欄杆外等待。這裡,前面已經通過的人就是一「代」,後面再繼續等待的一波人就是另外一「代」,欄杆每開啟關閉一次,就產生新一的「代」。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
//parties 這是一個final 變數,構造後將不允許改變
this.parties = parties;
//初始的時候還有多少個執行緒沒有到達(count)就等於 parties
this.count = parties;
//執行緒到達後都之下此方法
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
private void breakBarrier() {
//設定當前這一代打破標記為true
generation.broken = true;
count = parties;
//並且喚醒所有等待的執行緒
trip.signalAll();
}
相當於存在一個人無法赴約了,為了不讓其他人無限制的等下去(其他人都等待在trip的等待佇列上)選擇打破柵欄,意味著喚醒其他人說別等了,我有事情來不了
顯然breakBarrier的呼叫執行緒必須是當前已經持有鎖的執行緒,所以上面程式碼執行緒安全,後續的程式碼我們將看到什麼情況下柵欄會被打破
private void nextGeneration() {
//喚醒當前等待佇列上的執行緒
trip.signalAll();
//重置
count = parties;
generation = new Generation();
}
相當於過山車湊齊了一批次人,或者說工作人員發現人流量不行,一直等下去也不行,工作人員讓之前準備玩的人發車玩(喚醒等待的執行緒)並且重置,開啟下一代,等待下一批次的人
這也是CyclicBarrier可以迴圈使用的關鍵
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
這相當於工作人員看人流量不行,等下去顧客得抱怨了,直接讓等待的人玩(喚醒)並且開啟下一代,這裡首先獲取了下鎖,因為可能存在多個工作人員,其中任何一個覺得不行都可以選擇這麼做。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
直接呼叫的了dowait(boolean timed, long nanos)
方法包含兩個引數,第一個是否超時等待,第二個等待多久
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//當前這一代
final Generation g = generation;
//如果柵欄已經被打破 那麼丟擲BrokenBarrierException
//意味著我們使用的時候在外層catch住,進行應對柵欄被打破的業務邏輯即可
if (g.broken)
throw new BrokenBarrierException();
//如果執行緒已經被中斷,那麼復位中斷表示,並且打破柵欄,丟擲中斷異常
//意味著我們使用的時候在外層catch住,進行應對中斷後的業務邏輯即可
if (Thread.interrupted()) {
//打破柵欄意味著後續的執行緒指向await都會丟擲BrokenBarrierException,而不是等待所有執行緒就位後重獲自由
breakBarrier();
throw new InterruptedException();
}
//至此意味著已經有一個執行緒到達了 count自減
int index = --count;
//如果全部都到了,並且說明當前執行緒是最後一個到達的
if (index == 0) {
boolean ranAction = false;
try {
//構造時候指定需要執行的
final Runnable command = barrierCommand;
//直接呼叫的run 而不是啟動一個執行緒去執行
if (command != null)
command.run();
ranAction = true;
//開啟下一代,相當於這一批過上車的人湊齊了 發車了,那麼開啟等待下一批次人來
nextGeneration();
return 0;
} finally {
//說明barrierCommand執行的時候丟擲異常了
//直接打破柵欄
if (!ranAction)
breakBarrier();
}
}
//到達這裡說明,還有執行緒沒有到達,當前執行緒需要等待其他執行緒到達
for (;;) {
try {
//非超時等待
if (!timed)
//那麼讓當前執行緒放棄鎖,呼叫Condition.await()
trip.await();
else if (nanos > 0L)//超時等待,並且等待時間大於0
//呼叫Condition的超時等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//如果還是當前這一代,並且柵欄沒有被打破,那麼打破柵欄
if (g == generation && ! g.broken) {
breakBarrier();
//丟擲中斷
throw ie;
} else {
//如果換代了說明有執行緒A呼叫了nextGeneration,如果被柵欄被打破了說有有執行緒A呼叫了breakBarrier 這兩個方法都會喚醒所以等待的執行緒
//自我中斷 重置中斷標誌(也許外層執行緒方法中根據判斷中斷標誌進行不同的業務邏輯,所以要重置中斷標誌)
Thread.currentThread().interrupt();
}
}
//等待中被喚醒,或者超時等待時間過了 執行到此 發現柵欄被打破了 丟擲異常
if (g.broken)
throw new BrokenBarrierException();
//如果開啟了新的一代
if (g != generation)
return index;
//超時等待但是等待的時間小於等於0,意味著不等了 直接打破柵欄
//nanos是上面trip.awaitNanos(nanos)的返回值,指剩餘等待時間
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
如果當前執行緒是最後一個執行緒,它會負責呼叫barrierCommand的run方法(如果barrierCommand不為空的話)如果執行run失敗(run丟擲異常)那麼會打破柵欄,反之如果允許成功並且開啟下一代,開啟下一代會喚醒所有等待的執行緒,相當於最開始約定好5人一起看電影,最後來的買奶茶,E是最後來的先賣奶茶,然後到達目的地跟朋友們打招呼:不好意思我來了,我們走唄(喚醒其他等待的執行緒),如果賣奶茶的途中出現意外,那麼告知朋友們你們先去看(打破柵欄)
如果當前執行緒不是最後一個,那麼需要等待其他執行緒一起到達,在等待的途中會出現以下情況:
當前執行緒被喚醒,這種情況說明是有執行緒執行了breakBarrier 或者 nextGeneration,那麼condition.await 或者awaitNanos 執行完後需要判斷是柵欄被打破還是正常所以執行緒都到達了
//是柵欄被打破 那麼丟擲BrokenBarrierException
if (g.broken)
throw new BrokenBarrierException();
//如果是正常所以執行緒都到達了 那麼繼續
if (g != generation)
return index;
類似於,五個人看電影,ABCD被E通知不要繼續等待了,ABCD要問下E:「你是馬上到了,還是來不了了,如果到了我們團購票,如果來不了我們只能單獨買票」這裡根據E的不同情況進行不同買票其實就是catch住 BrokenBarrierException和正常返回中不同程式碼邏輯
try{
cyclicBarrier.await();
//買團體票
}cath(BrokenBarrierException e){
//單獨買票
}
當前執行緒被中斷了,這裡有需要分情況討論下:
ABCD都正常等待,但是E被中斷了
這時候E要負責去通知ABCD別等待了(打破柵欄)隨後自己丟擲中斷異常
//對於E來說 可能是這樣的
try{
cyclicBarrier.await();
//開心看電影
}catch(InterruptedException e){
//如果媽媽打電話讓我回家 那麼就回家
}
//對於ABCD看來說 E打破柵欄並且喚醒了他們,後續就會來到 (1)情況1
try{
cyclicBarrier.await();
//買團體票
}cath(BrokenBarrierException e){
//單獨買票
}
ABC都正常等待,D在E之前中斷,E隨後中斷
D會先打破柵欄丟擲中斷異常,E隨後被中斷從等待中喚醒只需要復位中斷標誌,繼續執行到情況1,就好像E的中斷是在D的喚醒之後一樣(就好像媽媽打電話讓回家,是在D之後,這時候已經去買票的路上了)
//對於E來說程式碼是這樣的
try{
cyclicBarrier.await();
while(Thread.currentThread.isInterrupted()){
//如果沒有中斷 那麼一直看電影
}
//反之
throw new InterruptedException()
}catch(InterruptedException e){
}
ABCE都正常等待,D來了,但是E在D來了之後才被中斷,相當於是在買票的途中被中斷
//對於E來說程式碼是這樣的
try{
cyclicBarrier.await();
//買票
while(Thread.currentThread.isInterrupted()){
//如果沒有中斷 那麼一直看電影
}
//反之
throw new InterruptedException()
}catch(InterruptedException e){
}
這中情況下,D執行了nextGeneration
後釋放了鎖,但是E需要拿到鎖才能繼續執行
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//D執行完nextGeneration 那麼這裡就不成立了
//也就是喚醒是在中斷之前的
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//進入此,說明是在一起買票的時候被中斷的
Thread.currentThread().interrupt();
}
}
//省略
}
等待超時了
五個人中存在一個人是急性子,他說爺只能等10分鐘,10分鐘過去了,他會說跟爺走,都別等了,隨後自己丟擲中斷異常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
//對於急性子的人來說程式碼是這樣的
try{
cyclicBarrier.await(10分鐘);
}catch(TimeoutException e){
//等了十分鐘 火氣很大地看電影,或者火氣很大的發脾氣中
}
nextGeneration
之前被中斷且順利拿到鎖barrierCommand
.run()(如果有barrierCommand)失敗barrierCommand
.run()(如果有barrierCommand)後一個支援在獲取元素但是佇列中不存在會讓當前執行緒等待,支援線上程新增元素但佇列已滿的時候讓當前執行緒等待的佇列。等待佇列中的入隊和出隊,都有四種特性方法應對容量不足或者沒有元素——丟擲異常,返回特殊值(入隊對於false,出隊就是null),阻塞當前執行緒,超時等待。由於null值對於阻塞佇列來說是一種特殊值(用於獲取元素但是佇列中,直接返回null)所以等待佇列是不接受null值的(丟擲NPE),等待佇列中的大部分操作是執行緒安全,但是例如addAll,removeAll等方法也許不是執行緒安全的(這取決於具體實現),等待佇列就是生產者消費者模型的絕佳幫手,生產者只需要生產完向佇列中塞(一般是空間不足掛起生產執行緒)消費者就從佇列無腦取(一般是無元素的時候進行等待)
方法 | 描述 |
---|---|
boolean add(E e) | 如果可以在不違反容量限制的情況下立即將指定元素插入此佇列,則返回 true,如果當前沒有可用空間則丟擲 IllegalStateException |
boolean offer(E e) | 如果可以在不違反容量限制的情況下立即將指定元素插入此佇列,則返回 true,如果當前沒有可用空間則返回 false。在使用容量受限的佇列時,這種方法一般更可取 |
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException | boolean offer(E e)的超時等待版本,如果有如果可以在不違反容量限制的情況下立即將指定元素插入此佇列,則返回 true,如果等待超時了還不能入隊那麼返回false,在等待的中途被中斷丟擲中斷異常 |
void put(E e) throws InterruptedException | 將元素入隊,如果沒有空間進行無限等待,在等待的中途被中斷丟擲中斷異常 |
E take() throws InterruptedException | 檢索並刪除此佇列的頭部,如果沒有元素可以刪除那麼一直等待,在等待的中途被中斷丟擲中斷異常 |
boolean remove(Object o) | 如果該佇列包含一個或多個這樣的元素,則刪除一個滿足 o.equals(e) 的元素 ,則返回 true。反之返回false |
E poll(long timeout, TimeUnit unit) throws InterruptedException | 超時等待獲取並移除佇列頭部,如果超時但是沒有元素那麼返回null,在等待的中途被中斷丟擲中斷異常 |
E take() throws InterruptedException | 無限期等待的獲取並移除佇列頭部,在等待的中途被中斷丟擲中斷異常 |
E element() | 檢索但不刪除此佇列的頭部。如果佇列為空那麼丟擲NoSuchElementException |
E peek() | 檢索但不刪除此佇列的頭部,如果此佇列為空,則返回 null |
int remainingCapacity() | 返回此佇列理想情況下(在沒有記憶體或資源限制的情況下)可以不阻塞地接受的附加元素的數量,如果沒有內在限制,則返回 Integer.MAX_VALUE。不能總是通過檢查剩餘容量來判斷插入元素的嘗試是否會成功,因為可能存在另一個執行緒即將插入或刪除元素的情況。返回:剩餘容量 |
為了實現,生產者在容量不足的時候阻塞,消費者在沒有元素的時候阻塞,阻塞佇列都基於等待通知模式
final ReentrantLock lock;
/**當佇列為空,並且當前執行緒嘗試等待地獲取元素時,將在此等待佇列上等待*/
private final Condition notEmpty;
/**當佇列滿,並且當前執行緒嘗試等待地插入元素時,將在此等待佇列上等待*/
private final Condition notFull;
等待地獲取元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//響應中斷的獲取鎖 如果獲取鎖的途中被中斷那麼丟擲中斷異常
lock.lockInterruptibly();
try {
//佇列為空 那麼一直等待
while (count == 0)
notEmpty.await();
//反之返回佇列頭並且會喚醒等待地插入元素的執行緒,喚醒等待的生產者
return dequeue();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
//如果超時 返回null
if (nanos <= 0)
return null;
//不同的超時等待,此方法返回的是等待的剩餘時間
nanos = notEmpty.awaitNanos(nanos);
}
//類似
return dequeue();
} finally {
lock.unlock();
}
}
等待地插入元素
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
//入隊 並且這裡會喚醒等待獲取元素的執行緒
enqueue(e);
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//佇列滿
while (count == items.length) {
//超時返回fasle 表示入隊失敗
if (nanos <= 0)
return false;
//超時等待,返回的是剩餘等待時間
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}