轉載請註明出處:
Future表示一個非同步計算的結果。它提供了isDone()來檢測計算是否已經完成,並且在計算結束後,可以通過get()方法來獲取計算結果。在非同步計算中,Future確實是個非常優秀的介面。但是,它的本身也確實存在著許多限制:
並行執行多工:Future只提供了get()方法來獲取結果,並且是阻塞的。所以,除了等待別無他法;
無法對多個任務進行鏈式呼叫:如果你希望在計算任務完成後執行特定動作,比如發郵件,但Future卻沒有提供這樣的能力;
無法組合多個任務:如果你執行了10個任務,並期望在它們全部執行結束後執行特定動作,那麼在Future中這是無能為力的;
沒有例外處理:Future介面中沒有關於例外處理的方法;
Future 注意事項
當 for 迴圈批次獲取 Future 的結果時容易 block,get 方法呼叫時應使用 timeout 限制
Future 的生命週期不能後退。一旦完成了任務,它就永久停在了「已完成」的狀態,不能從頭再來
針對 Future 使用中的同時對多個非同步任務進行編排的不足,java 使用 CompletableFuture是Future介面的擴充套件和增強。CompletableFuture實現了Future介面,並在此基礎上進行了豐富地擴充套件,完美地彌補了Future上述的種種問題。更為重要的是,CompletableFuture實現了對任務的編排能力。藉助這項能力,我們可以輕鬆地組織不同任務的執行順序、規則以及方式。從某種程度上說,這項能力是它的核心能力。而在以往,雖然通過CountDownLatch等工具類也可以實現任務的編排,但需要複雜的邏輯處理,不僅耗費精力且難以維護。
應用場景
描述依賴關係:
thenApply() 把前面非同步任務的結果,交給後面的Function
thenCompose()用來連線兩個有依賴關係的任務,結果由第二個任務返回
描述and聚合關係:
thenCombine:任務合併,有返回值
thenAccepetBoth:兩個任務執行完成後,將結果交給thenAccepetBoth消耗,無返回值。
runAfterBoth:兩個任務都執行完成後,執行下一步操作(Runnable)。
描述or聚合關係:
applyToEither:兩個任務誰執行的快,就使用那一個結果,有返回值。
acceptEither: 兩個任務誰執行的快,就消耗那一個結果,無返回值。
runAfterEither: 任意一個任務執行完成,進行下一步操作(Runnable)。
並行執行:
CompletableFuture類自己也提供了anyOf()和allOf()用於支援多個CompletableFuture並行執行
CompletableFuture 提供了四個靜態方法來建立一個非同步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
這四個方法區別在於:
runAsync 方法以Runnable函數式介面型別為引數,沒有返回結果,supplyAsync 方法Supplier函數式介面型別為引數,返回結果型別為U;Supplier 介面的 get() 方法是有返回值的(會阻塞)
預設情況下 CompletableFuture 會使用公共的 ForkJoinPool 執行緒池,這個執行緒池預設建立的執行緒數是 CPU 的核數(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設定 ForkJoinPool 執行緒池的執行緒數)。如果所有 CompletableFuture 共用一個執行緒池,那麼一旦有任務執行一些很慢的 I/O 操作,就會導致執行緒池中所有執行緒都阻塞在 I/O 操作上,從而造成執行緒飢餓,進而影響整個系統的效能。所以,強烈建議你要根據不同的業務型別建立不同的執行緒池,以避免互相干擾
範例:
Runnable runnable = () -> System.out.println("執行無返回結果的非同步任務"); CompletableFuture.runAsync(runnable); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { System.out.println("執行有返回值的非同步任務"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello World"; }); String result = future.get();
在使用過程中,可以使用 spring 提供的 ThreadPoolTaskExecutor 作為 執行緒池的執行器
join&get
join()和get()方法都是用來獲取CompletableFuture非同步之後的返回值。join()方法丟擲的是uncheck異常(即未經檢查的異常),不會強制開發者丟擲。get()方法丟擲的是經過檢查的異常,ExecutionException, InterruptedException 需要使用者手動處理(丟擲或者 try catch)
當CompletableFuture的計算結果完成,或者丟擲異常的時候,我們可以執行特定的 Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
方法不以Async結尾,意味著Action使用相同的執行緒執行,而Async可能會使用其它的執行緒去執行(如果使用相同的執行緒池,也可能會被同一個執行緒選中執行)。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } if (new Random().nextInt(10) % 2 == 0) { int i = 12 / 0; } System.out.println("執行結束!"); return "test"; }); future.whenComplete(new BiConsumer<String, Throwable>() { @Override public void accept(String t, Throwable action) { System.out.println(t+" 執行完成!"); } }); future.exceptionally(new Function<Throwable, String>() { @Override public String apply(Throwable t) { System.out.println("執行失敗:" + t.getMessage()); return "異常xxxx"; }
所謂結果轉換,就是將上一段任務的執行結果作為下一階段任務的入參參與重新計算,產生新的結果。
thenApply
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
應用:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int result = 100; System.out.println("一階段:" + result); return result; }).thenApply(number -> { int result = number * 3; System.out.println("二階段:" + result); return result; });
thenCompose
thenCompose 的引數為一個返回 CompletableFuture 範例的函數,該函數的引數是先前計算步驟的結果。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
應用
CompletableFuture<Integer> future = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(30); System.out.println("第一階段:" + number); return number; } }) .thenCompose(new Function<Integer, CompletionStage<Integer>>() { @Override public CompletionStage<Integer> apply(Integer param) { return CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = param * 2; System.out.println("第二階段:" + number); return number; } }); } });
thenApply 和 thenCompose的區別
thenApply 轉換的是泛型中的型別,返回的是同一個CompletableFuture;
thenCompose 將內部的 CompletableFuture 呼叫展開來並使用上一個CompletableFutre 呼叫的結果在下一步的 CompletableFuture 呼叫中進行運算,是生成一個新的CompletableFuture。
allOf方法用來實現多 CompletableFuture 的同時返回。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
範例:
@Resource private ThreadPoolTaskExecutor completableExecutor; public void test() { List<Student> allList = Lists.newCopyOnWriteArrayList(); List<Student> oneClassList = Lists.newCopyOnWriteArrayList(); List<Student> twoClassList = Lists.newCopyOnWriteArrayList(); CompletableFuture.allOf( CompletableFuture.runAsync(() -> { List<Student> firstList = service.doFirstList(); oneClassList.addAll(firstList); }, completableExecutor), CompletableFuture.runAsync(() -> { List<Student> secondList = service.doSecondList(); twoClassList.addAll(secondList); }, completableExecutor) ).join(); allList.addAll(oneClassList); allList.addAll(twoClassList); }