無論是專案開發還是開原始碼閱讀,多執行緒都是不可或缺的一個重要知識點,基於這個考量,於是總結出本篇文章,討論閉鎖(CountDownLatch)、柵欄(CyclicBarrier)與非同步編排(CompletableFuture)
@Author:Akai-yuan
@更新時間:2023/2/4
主執行緒建立了5個子執行緒,各子任務執行確認動作,期間主執行緒進入等待狀態,直到各子執行緒的任務均已經完成,主執行緒恢復繼續執行。
從多執行緒的角度看,這恰似你建立了一些多執行緒,但是你需要統一管理它們的任務開始時間。
CountDownLatch基於一個同步器實現,並且只有CountDownLatch(int count)一個構造器,指定數量count不得在中途修改它。
核心函數
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(4);
Thread t1 = new Thread(countDownLatch::countDown);
Thread t2 = new Thread(countDownLatch::countDown);
Thread t3 = new Thread(countDownLatch::countDown);
Thread t4 = new Thread(() -> {
try {
// 稍等...
Thread.sleep(1500);
countDownLatch.countDown();
} catch (InterruptedException ignored) {}
});
t1.start();
t2.start();
t3.start();
t4.start();
//直到所有執行緒都對計數器進行減一後,這裡才放行
countDownLatch.await();
System.out.println("所有子執行緒就位,可以繼續執行其他任務");
}
我們仍然用4個執行緒呼叫了start(),但是它們在執行時都在等待countDownLatch的訊號,在訊號未收到前,它們不會往下執行。
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Thread t1 = new Thread(() -> waitForCountDown(countDownLatch));
Thread t2 = new Thread(() -> waitForCountDown(countDownLatch));
Thread t3 = new Thread(() -> waitForCountDown(countDownLatch));
Thread t4 = new Thread(() -> waitForCountDown(countDownLatch));
t1.start();
t2.start();
t3.start();
t4.start();
Thread.sleep(1000);
countDownLatch.countDown();
System.out.println("所有執行緒準備完成");
}
private static void waitForCountDown(CountDownLatch countDownLatch) {
try {
// 等待訊號
countDownLatch.await();
System.out.println("本執行緒等待完畢");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
輸出:
所有執行緒準備完成
本執行緒等待完畢
本執行緒等待完畢
本執行緒等待完畢
本執行緒等待完畢
Process finished with exit code 0
// 定義一個CountDownLatch計數器
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
public void start() {
switch (workerStateUpdater.get(this)) {
case WORKER_STATE_INIT:
if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
//此處呼叫工作執行緒執行CountDownLatch的countDown()方法
//即startTimeInitialized.countDown();
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// 等待startTime被工作執行緒初始化完成
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
柵欄類似於閉鎖,它能阻塞一組執行緒直到某個事件的發生。柵欄與閉鎖的關鍵區別在於,所有的執行緒必須同時到達柵欄位置,才能繼續執行。閉鎖用於等待事件,而柵欄用於等待其他執行緒。
CyclicBarrier與CountDownLatch的區別
CyclicBarrier | CountDownLatch |
---|---|
CyclicBarrier是可重用的,其中的執行緒會等待所有的執行緒完成任務。屆時,屏障將被拆除,並可以選擇性地做一些特定的動作。 | CountDownLatch是一次性的,不同的執行緒在同一個計數器上工作,直到計數器為0 |
CyclicBarrier面向的是執行緒數 | CountDownLatch面向的是任務數 |
在使用CyclicBarrier時,你必須在構造中指定參與共同作業的執行緒數,這些執行緒必須呼叫await()方法 | 使用CountDownLatch時,則必須要指定任務數,至於這些任務由哪些執行緒完成無關緊要 |
CyclicBarrier可以在所有的執行緒釋放後重新使用 | CountDownLatch在計數器為0時不能再使用 |
在CyclicBarrier中,如果某個執行緒遇到了中斷、超時等問題時,則處於await的執行緒都會出現問題 | 在CountDownLatch中,如果某個執行緒出現問題,其他執行緒不受影響 |
// 指定參與方的數量;
public CyclicBarrier(int parties);
// 指定參與方的數量,並指定在本代次結束時執行的程式碼
public CyclicBarrier(int parties, Runnable barrierAction):
//如果當前執行緒不是第一個到達屏障的話,它將會進入等待,直到其他執行緒都到達
//除非發生被中斷、屏障被拆除、屏障被重設等情況
public int await();
//和await()類似,但是加上了時間限制;
public int await(long timeout, TimeUnit unit);
//當前屏障是否被拆除;
public boolean isBroken();
//重設當前屏障。會先拆除屏障再設定新的屏障
public void reset();
//正在等待的執行緒數量
public int getNumberWaiting();
下面以一個簡單的日常對話來講解CyclicBarrier的使用範例
private static String appointmentPlace = "書房";
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("yuan所在的地點:" + appointmentPlace));
// 執行緒Akai
Thread Akai = newThread("Akai", () -> {
System.out.println("yuan,飯好了快來吃飯...");
try {
// 此時Akai在屏障前等待
cyclicBarrier.await();
System.out.println("yuan,你來了...");
// 開始吃飯...
Thread.sleep(2600);
System.out.println("好的,你去洗你的碗吧!");
// 第二次呼叫await
cyclicBarrier.await();
Thread.sleep(100);
System.out.println("好吧,你這個懶豬!");
} catch (Exception e) {
e.printStackTrace();
}
});
// 執行緒yuan
Thread yuan = newThread("yuan", () -> {
try {
// yuan在敲程式碼
Thread.sleep(500);
System.out.println("我在敲程式碼,我馬上就來!");
// yuan到達飯桌前
cyclicBarrier.await();
Thread.sleep(500);
System.out.println("Akai,不好意思,剛剛沉迷於敲程式碼了!");
// 開始吃飯...
Thread.sleep(1500);
// yuan想先吃完趕快洗碗然後溜出去敲程式碼
System.out.println("我吃完了,我要去洗碗了");
// yuan把地點改成了廚房
appointmentPlace = "廚房";
// 洗碗中...
Thread.sleep(1500);
System.out.println("︎yuan終於洗完自己的碗了");
// 第二次呼叫await
cyclicBarrier.await();
System.out.println("Akai你吃完了,你的碗自己去洗吧,我已經在敲程式碼了");
} catch (Exception ignored) {}
});
Akai.start();
yuan.start();
}
輸出結果:
yuan,飯好了快來吃飯...
我在敲程式碼,我馬上就來!
yuan所在的地點:書房
yuan,你來了...
Akai,不好意思,剛剛沉迷於敲程式碼了!
我吃完了,我要去洗碗了
好的,你去洗你的碗吧!
yuan終於洗完自己的碗了
yuan所在的地點:廚房
Akai你吃完了,你的碗自己去洗吧,我已經在敲程式碼了
好吧,你這個懶豬!
CompletableFuture是Future介面的擴充套件和增強。
CompletableFuture完整地繼承了Future介面,並在此基礎上進行了豐富地擴充套件,完美地彌補了Future上述的種種問題。更為重要的是,CompletableFuture實現了對任務的編排能力。藉助這項能力,我們可以輕鬆地組織不同任務的執行順序、規則以及方式。
從某種程度上說,這項能力是它的核心能力。而在以往,雖然通過CountDownLatch等工具類也可以實現任務的編排,但需要複雜的邏輯處理,不僅耗費精力且難以維護。
我們首先來討論CompletableFuture的核心:CompletionStage
顧名思義,根據CompletionStage名字中的"Stage",你可以把它理解為任務編排中的步驟。步驟,即任務編排的基本單元,它可以是一次純粹的計算或者是一個特定的動作。在一次編排中,會包含多個步驟,這些步驟之間會存在依賴、鏈式和組合等不同的關係,也存在並行和序列的關係。這種關係,類似於Pipeline或者流式計算。
既然是編排,就需要維護任務的建立、建立計算關係。為此,CompletableFuture提供了多達50多個方法,但沒有必要全部完全理解,但我們可以通過分類的方式簡化對方法的理解,理解了型別和變種,基本上我們也就掌握了CompletableFuture的核心能力。
這些方法可以總結為以下四類,其他大部分方法都是基於這四種型別的變種:
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("something");
}
});
// 建立nameFuture,返回姓名
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "Akai-yuan";
});
// 使用thenApply()接收nameFuture的結果,並執行回撥動作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "love you," + name;
});
//阻塞獲得表白的結果
System.out.println(sayLoveFuture.get()); // love you,Akai-yuan
一旦理解了supply()的含義,它也就如此簡單。如果你希望用新的執行緒執行任務,可以使用supplyAsync().
// 使用thenApply()接收nameFuture的結果,並執行回撥動作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "愛你," + name;
});
public <U> CompletableFuture <U> thenApplyAsync(
Function <? super T, ? extends U> fn) {
return uniApplyStage(null, fn);
}
作為supply()的檔案,thenApply()並不是唯一的存在,thenAccept()也是。但與thenApply()不同,thenAccept()只接收資料,但不會返回,它的返回型別是Void.
CompletableFuture<Void> sayLoveFuture = nameFuture.thenAccept(name -> {
System.out.println("愛你," + name);
});
public CompletableFuture < Void > thenAccept(Consumer < ? super T > action) {
return uniAcceptStage(null, action);
}
thenRun()就比較簡單了,不接收任務的結果,只執行特定的任務,並且也不返回結果。
public CompletableFuture < Void > thenRun(Runnable action) {
return uniRunStage(null, action);
}
所以,如果你在回撥中不想返回任何的結果,只執行特定的邏輯,那麼你可以考慮使用thenAccept和thenRun一般來說,這兩個方法會在呼叫鏈的最後面使用。
以上幾種方法都是各玩各的,但thenCompose()與thenCombine()就不同了,它們可以實現對依賴和非依賴兩種型別的任務的編排。
編排兩個存在依賴關係的任務
在前面的例子中,在接收前面任務的結果時,我們使用的是thenApply(). 也就是說,sayLoveFuture在執行時必須依賴nameFuture的完成,否則執行個錘子。
// 建立Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "Akai-yuan";
});
// 使用thenApply()接收nameFuture的結果,並執行回撥動作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "愛你," + name;
});
但其實,除了thenApply()之外,我們還可以使用thenCompose()來編排兩個存在依賴關係的任務。比如,上面的範例程式碼可以寫成:
// 建立Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "Akai-yuan";
});
CompletableFuture<String> sayLoveFuture2 = nameFuture.thenCompose(name -> {
return CompletableFuture.supplyAsync(() -> "愛你," + name);
});
可以看到,thenCompose()和thenApply()的核心不同之處在於它們的返回值型別:
組合兩個相互獨立的任務
考慮一個場景,當我們在執行某個任務時,需要其他任務就緒才可以,應該怎麼做?這樣的場景並不少見,我們可以使用前面學過的並行工具類實現,也可以使用thenCombine()實現。
舉個例子,當我們計算某個勝率時,我們需要獲取她參與的總場次(rounds),以及獲勝的場次(winRounds),然後再通過winRounds / rounds來計算。對於這個計算,我們可以這麼做:
CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> 500);
CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> 365);
CompletableFuture < Object > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
return 0.0;
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
});
System.out.println(winRateFuture.get());
thenCombine()將另外兩個任務的結果同時作為引數,參與到自己的計算邏輯中。在另外兩個引數未就緒時,它將會處於等待狀態。
allOf()與anyOf()也是一對孿生兄弟,當我們需要對多個Future的執行進行組織時,就可以考慮使用它們:
allOf()與anyOf()的方法簽名如下:
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
需要注意的是,anyOf()將返回完任務的執行結果,但是allOf()不會返回任何結果,它的返回值是Void.
allOf()與anyOf()的範例程式碼如下所示。我們建立了roundsFuture和winRoundsFuture,並通過sleep模擬它們的執行時間。在執行時,winRoundsFuture將會先返回結果,所以當我們呼叫 CompletableFuture.anyOf時也會發現輸出的是365.
CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
return 500;
} catch (InterruptedException e) {
return null;
}
});
CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
return 365;
} catch (InterruptedException e) {
return null;
}
});
CompletableFuture < Object > completedFuture = CompletableFuture.anyOf(winRoundsFuture, roundsFuture);
System.out.println(completedFuture.get()); // 返回365
CompletableFuture < Void > completedFutures = CompletableFuture.allOf(winRoundsFuture, roundsFuture);
在CompletableFuture之前,如果要實現所有任務結束後執行特定的動作,我們可以考慮CountDownLatch等工具類。現在,則多了一選項,我們也可以考慮使用CompletableFuture.allOf.
在CompletableFuture鏈式呼叫中,如果某個任務發生了異常,那麼後續的任務將都不會再執行。對於異常,我們有兩種處理方式:exceptionally()和handle().
在鏈式呼叫的尾部使用exceptionally(),捕獲異常並返回錯誤情況下的預設值。需要注意的是,exceptionally()僅在發生異常時才會呼叫。
CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
throw new RuntimeException("總場次錯誤");
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
}).exceptionally(ex -> {
System.out.println("出錯:" + ex.getMessage());
return "";
});
System.out.println(winRateFuture.get());
除了exceptionally(),CompletableFuture也提供了handle()來處理異常。不過,與exceptionally()不同的是,當我們在呼叫鏈中使用了handle(),那麼無論是否發生異常,都會呼叫它。所以,在handle()方法的內部,我們需要通過 if (ex != null) 來判斷是否發生了異常。
CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
throw new RuntimeException("總場次錯誤");
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
}).handle((res, ex) -> {
if (ex != null) {
System.out.println("出錯:" + ex.getMessage());
return "";
}
return res;
});
System.out.println(winRateFuture.get());
當然,如果我們允許某個任務發生異常而不中斷整個呼叫鏈路,那麼可以在其內部通過try-catch消化掉。