關注王有志,一個分享硬核Java技術的互金摸魚俠
歡迎你加入Java人的提桶跑路群:共同富裕的Java人
今天我們來學習AQS家族的「外門弟子」:CyclicBarrier。
為什麼說CyclicBarrier是AQS家族的「外門弟子」呢?那是因為CyclicBarrier自身和內部類Generation並沒有繼承AQS,但在原始碼的實現中卻深度依賴AQS家族的成員ReentrantLock。就像修仙小說中,大家族會區分外門和內門,外門弟子通常會藉助內門弟子的名聲行事,CyclicBarrier正是這樣,因此算是AQS家族的「外門弟子」。在實際的面試中,CyclicBarrier的出現的次數較少,通常會出現在與CountDownLatch比較的問題當中。
今天我們就逐步拆解CyclicBarrier,來看看它與CountDownLatch之間到底有什麼差別。
先從CyclicBarrier的名字開始入手,Cyclic是形容詞,譯為「迴圈的,週期的」,Barrier是名詞,譯為「屏障,柵欄」,組合起來就是「迴圈的屏障」,那麼該怎麼理解「迴圈的屏障」呢?我們來看CyclicBarrier的註釋是怎麼解釋的:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
CyclicBarrier是一種同步輔助工具,允許一組執行緒等待彼此到達共同的屏障點。
The barrier is called cyclic because it can be re-used after the waiting threads are released.
因為在等待執行緒釋放後可以重複使用,所以屏障被稱為迴圈屏障。
看起來與CountDownLatch有些相似,我們通過一張圖來展示下CyclicBarrier是怎樣工作的:
部分執行緒到達屏障後,會在屏障處等待,只有全部執行緒都到達屏障後,才會繼續執行。如果以CountDownLatch中越野徒步來舉例的話,把老闆拿掉,選手之間的互相等待,就是CyclicBarrier了。
另外,註釋中說CyclicBarrier是「re-used」,即可重複使用的。回想一下CountDownLatch的實現,並未做任何重置計數器的工作,即當CountDownLatch的計數減為0後不能恢復,也就是說CountDownLatch的功能是一次性的。
Tips:實際上,可以用CountDownLatch實現類似於CyclicBarrier的功能。
我們用沒有老闆參加的越野徒步來舉例,部分先到的選手要等待後到的選手一起吃午飯,用CyclicBarrier來實現的程式碼是這樣的:
// 初始化CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep((finalI + 1));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
System.out.println("選手[" + finalI + "]到達終點,等待其他選手!!!");
// 執行緒在屏障點處等待
cyclicBarrier.await();
System.out.println("選手[" + finalI + "]開始吃午飯啦!!!");
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}).start();
}
用法和CountDownLatch很相似,建構函式設定CyclicBarrier需要多少個執行緒達到屏障後統一行動,區別是CyclicBarrier在每個執行緒中都呼叫了CyclicBarrier#await
,而我們在使用CountDownLatch時只在主執行緒中呼叫了一次CountDownLatch#await
。
那CountDownLatch可以線上程中呼叫CountDownLatch#await
嗎?答案是可以的,這樣使用的效果和CyclicBarrier是一樣的:
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep((finalI + 1));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("選手[" + finalI + "]到達終點!!!");
countDownLatch.countDown();
try {
countDownLatch.await();
System.out.println("選手[" + finalI + "]開始吃午飯啦!!!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
通過上面的例子,我們不難想到CyclicBarrier#await
方法是同時具備了CountDownLatch#countDown
方法和CountDownLatch#await
方法的能力,即執行了計數減1,又執行了暫停執行緒。
我們先整體認識一下CyclicBarrier:
CyclicBarrier的內部結構比CountDownLatch複雜一些,除了我們前面提到的藉助AQS的「內門弟子」ReentrantLock型別的lock
和Condition型別的trip
外,CyclicBarrier還有兩個「特別」的地方:
內部類Generation,直譯過來是「代」,它起到什麼作用?
Runnable型別的成員變數barrierCommand
,它又做了些什麼?
其餘的部分,大部分可以在CountDownLatch中找到對應的方法,或者通過名稱我們就很容易得知它們的作用。
CyclicBarrier提供了兩個(實際是一個)構造方法:
// 需要到達屏障的執行緒數
private final int parties;
// 所有執行緒都到達後執行的動作
private final Runnable barrierCommand;
// 計數器
private int count;
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) {
throw new IllegalArgumentException();
}
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
第二個建構函式接收了兩個引數:
parties
:表示需要多少個執行緒到達屏障處呼叫CyclicBarrier#await
;
barrierAction
:所有執行緒到達屏障後執行的動作。
構造方法的程式碼一如既往的簡單,只有一處比較容易產生疑惑,parties
和count
有什麼區別?
首先來看成員變數的宣告,parties
使用了final
,表明它是不可變的物件,代表CyclicBarrier需要幾個執行緒共同到達屏障處;而count
是計數器,初始值是parties
,隨著到達屏障處的執行緒數量增多count
會逐步減少至0。
private static class Generation {
Generation() {}
boolean broken;
}
Generation用於標記CyclicBarrier的當前代,Doug Lea是這麼解釋它的作用的:
Each use of the barrier is represented as a generation instance. The generation changes whenever the barrier is tripped, or is reset.
每次使用屏障(CyclicBarrier)都需要一個Generation範例。無論是通過屏障還是重置屏障,Generation都會發生改變。
Generation中的broken
用於標記當前的CyclicBarrier是否被打破,預設為false,值為true時表示當前CyclicBarrier已經被打破,此時CyclicBarrier不能正常使用,需要呼叫CyclicBarrier#reset
方法重置CyclicBarrier的狀態。
前面我們猜測CyclicBarrier#await
方法即實現了計數減1,又實現了執行緒等待的功能,下面我們就通過原始碼來驗證我們的想法:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
兩個過載方法都指向了CyclicBarrier#dowait
方法:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
// 使用ReentrantLock
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 第2部分
// 獲取CyclicBarrier的當前代,並檢查CyclicBarrier是否被打破
final Generation g = generation;
if (g.broken) {
throw new BrokenBarrierException();
}
// 執行緒被中斷時,呼叫breakBarrier方法
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 第3部分
//計數器減1
int index = --count;
// 計數器為0時表示所有執行緒都到達了,此時要做的就是喚醒等待中的執行緒
if (index == 0) {
boolean ranAction = false;
try {
// 執行喚醒前的操作
final Runnable command = barrierCommand;
if (command != null) {
command.run();
}
ranAction = true;
// CyclicBarrier進入下一代
nextGeneration();
return 0;
} finally {
if (!ranAction) {
breakBarrier();
}
}
}
// 第4部分
// 只有部分執行緒到達屏障處的情況
for (;;) {
try {
//呼叫等待邏輯)
if (!timed) {
trip.await();
} else if (nanos > 0L) {
nanos = trip.awaitNanos(nanos);
}
} catch (InterruptedException ie) {
// 執行緒被中斷時,呼叫breakBarrier方法
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken) {
throw new BrokenBarrierException();
}
// 如果不是當前代,返回計數器的值
if (g != generation) {
return index;
}
// 如果等待超時,呼叫breakBarrier方法
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
CyclicBarrier#dowait
方法看起來很長,但如果拆成3部分來看邏輯並不複雜:
第1部分:CyclicBarrier與執行緒的狀態校驗;
第2部分:當計數器減1後值為0時,喚醒所有等待中的執行緒;
第3部分:當計數器減1後值不為0時,執行緒進入等待狀態。
先來看第1部分,CyclicBarrier與執行緒的狀態校驗的部分,先是判斷CyclicBarrier是否被打破,接著判斷當前執行緒是否為中斷狀態,如果是則呼叫CyclicBarrier#breakBarrier
方法:
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
CyclicBarrier#breakBarrier
方法非常簡單,只做了3件事:
標記CyclicBarrier被打破;
重置CyclicBarrier的計數器;
喚醒全部等待中的執行緒。
也就是說,一旦有個執行緒標記為中斷狀態,都會直接打破CyclicBarrier的屏障。
我們先跳過第2部分的喚醒邏輯,直接來看第3部分執行緒進入等待狀態的邏輯。根據timed
引數選擇呼叫Condition不同的等待方法,隨後是對異常的處理和執行緒中斷狀態的處理,同樣是呼叫CyclicBarrier#breakBarrier
,標記CyclicBarrier不可用。執行緒進入等待狀態的邏輯並不複雜,本質上是通過AQS的Condition來實現的。
最後來看第2部分喚醒所有等待中執行緒的操作,根據計數器是否為0判斷是否需要進行喚醒。如果需要喚醒,最後一個執行CyclicBarrier#await
的執行緒執行barrierCommand
(此時尚未執行任何執行緒喚醒的操作),做通過屏障前的處理操作,接著呼叫CyclicBarrier#nextGeneration
方法:
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}
CyclicBarrier#nextGeneration
方法也做了3件事:
喚醒所有Condition上等待的執行緒;
重置CyclicBarrier的計數器;
建立新的Generation物件。
很符合進入「下一代」的名字,先喚醒「上一代」所有等待中的執行緒,然後重置CyclicBarrier的計數器,最後更新CyclicBarrier的Generation物件,對CyclicBarrier進行重置工作,讓CyclicBarrier進入下一個紀元。
到這裡我們不難發現,CyclicBarrier自身只做了維護計數器和重置計數器的工作,而保證互斥性和執行緒的等待與喚醒則是依賴AQS家族的成員完成的:
ReentrantLock保證了同一時間只有一個執行緒可以執行CyclicBarrier#await
,即同一時間只有一個執行緒可以維護計數器;
Condition為CyclicBarrier提供了條件等待佇列,完成了執行緒的等待與喚醒的工作。
最後我們來看CyclicBarrier#reset
方法:
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 主動打破CyclicBarrier
breakBarrier();
// 使CyclicBarrier進入下一代
nextGeneration();
} finally {
lock.unlock();
}
}
CyclicBarrier#reset
方法都是老面孔,先是CyclicBarrier#breakBarrier
打破上一代CyclicBarrier,既然要重新開始就不要再「懷念」過去了;最後呼叫CyclicBarrier#nextGeneration
開始新的時代。需要注意的是,這裡加鎖的目的是為了保證執行CyclicBarrier#reset
時,沒有任何執行緒正在執行CyclicBarrier#await
方法。
好了,到這裡CyclicBarrier的核心內容我們就一起分析完了,剩下的方法就非常簡單了,相信通過名字大家就可以瞭解它們的作用,並猜到它們的實現了。
Tips:CyclicBarrier#getNumberWaiting
中加了鎖,這是為什麼?
最後的部分,我們來解答下開篇時的面試題,CountDownLatch和Cyclicbarrier有什麼區別?
第1點:CyclicBarrier可以重複使用,CountDownLatch不能重複使用。
無論是正常使用結束,還是呼叫CyclicBarrier#reset
方法,Cyclicbarrier都可以重置內部的計數器
第2點:Cyclicbarrier只阻塞呼叫CyclicBarrier#await
方法的執行緒,而CountDownLatch可以阻塞任意一個或多個執行緒。
CountDownLatch將計數減1與阻塞拆分成了CountDownLatch#countDown
和CountDownLatch#await
兩個方法,而Cyclicbarrier只通過CyclicBarrier#await
完成兩步操作。如果在同一個執行緒中連續CountDownLatch#countDown
和CountDownLatch#await
則實現了與CyclicBarrier#await
方法相同的功能。
好了,今天就到這裡結束了。如果本文對你有幫助的話,請多多點贊支援。最後歡迎大家關注分享硬核技術的金融摸魚俠王有志,我們下次再見!