runAsync() :開啟非同步(建立執行緒執行任務),無返回值
supplyAsync() :開啟非同步(建立執行緒執行任務),有返回值
thenApply() :然後應用,適用於有返回值的結果,拿著返回值再去處理。
exceptionally():用於處理非同步任務執行過程中出現異常的情況的一個方法:返回預設值或者一個替代的 CompletableFuture 物件,從而避免系統的崩潰或例外處理的問題。
handle():類似exceptionally()
get() :阻塞執行緒:主要可以: ①獲取執行緒中的異常然後處理異常、②設定等待時間
join() :阻塞執行緒:推薦使用 join() 方法,因為它沒有受到 interrupt 的干擾,不需要捕獲異常,也不需要強制型別轉換。他自己會丟擲異常。
CompletableFuture.allOf()
CompletableFuture.anyOf()
CompletableFuture 中的 get() 和 join() 方法都用於獲取非同步任務的執行結果,但是在使用時需要注意以下幾點區別:
1. 丟擲異常的方式不同:如果非同步任務執行過程中出現異常, get() 方法會丟擲 ExecutionException 異常,而 join() 方法會丟擲 CompletionException 異常,這兩個異常都是繼承自 RuntimeException 的。
2. 方法呼叫限制不同: join() 方法是不可以被中斷的,一旦呼叫就必須等待任務執行完成才能返回結果;而 get() 方法可以在呼叫時設定等待的超時時間,如果超時還沒有獲取到結果,就會丟擲 TimeoutException 異常。
3. 返回結果型別不同: get() 方法返回的是非同步任務的執行結果,該結果是泛型型別 T 的,需要強制轉換才能獲取真正的結果;而 join() 方法返回的是非同步任務的執行結果,該結果是泛型型別 T,不需要強制轉換。
4. 推薦使用方式不同:推薦在 CompletableFuture 中使用 join() 方法,因為它沒有受到 interrupt 的干擾,不需要捕獲異常,也不需要強制型別轉換。
綜上所述, get() 方法和 join() 方法都是獲取非同步任務的執行結果,但是在使用時需要根據具體場景選擇使用哪個方法。如果需要獲取執行結果並且不希望被中斷,推薦使用 join() 方法;如果需要控制等待時間或者需要捕獲異常,則可以使用 get() 方法。
CompletableFuture 是 Java 8 引入的一個強大的非同步程式設計工具,它支援鏈式呼叫、組合和轉換非同步操作等功能。其中,anyOf 和 allOf 都是 CompletableFuture 的兩個常用方法,它們的區別如下:
1. anyOf:任意一個 CompletableFuture 完成,它就會跟隨這個 CompletableFuture 的結果完成,返回第一個完成的 CompletableFuture 的結果。
2. allOf:所有的 CompletableFuture 都完成時,它才會跟隨它們的結果完成,返回一個空的 CompletableFuture。
簡而言之,anyOf 和 allOf 的最大區別是:anyOf 任意一個 CompletableFuture 完成就跟著它的結果完成,而 allOf 所有的 CompletableFuture 完成才可以完成,並返回一個空的 CompletableFuture。
舉例來說,如果有三個 CompletableFuture:f1、f2、f3,其中 f1 和 f2 可能會返回一個字串,而 f3 可能會返回一個整數,那麼:
- anyOf(f1, f2, f3) 的結果是 f1、f2、f3 中任意一個 CompletableFuture 的結果;
- allOf(f1, f2, f3) 的結果是一個空的 CompletableFuture,它的完成狀態表示 f1、f2、f3 是否全部完成。
總之,anyOf 和 allOf 在實際使用中可以根據不同的需求來選擇,它們都是 CompletableFuture 中非常強大的組合操作。
package com.cc.md.entity;
import lombok.Data;
/**
* @author CC
* @since 2023/5/24 0024
*/
@Data
public class UserCs {
private String name;
private Integer age;
}
@Resource(name = "myIoThreadPool")
private ThreadPoolTaskExecutor myIoThreadPool;
//CompletableFuture開啟多執行緒——無返回值的
@Test
public void test06() throws Exception {
List<CompletableFuture<Void>> futures = new ArrayList<>();
//迴圈,模仿很多工
for (int i = 0; i < 1000; i++) {
int finalI = i;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
//第一批建立的執行緒數
log.info("列印:{}", finalI);
//模仿io流耗時
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, myIoThreadPool);
futures.add(future);
}
//阻塞:多執行緒的任務執行。相當於多執行緒執行完了,再執行後面的程式碼
//如果不阻塞,上面的相當於非同步執行了。
//阻塞方式1:可以獲取返回的異常、設定等待時間
// futures.forEach(future -> {
// try {
// future.get();
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// });
//阻塞方式2(推薦)
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
log.info("列印:都執行完了。。。");
}
@Resource(name = "myIoThreadPool")
private ThreadPoolTaskExecutor myIoThreadPool;
//CompletableFuture開啟多執行緒——有返回值的,返回一個新的List——先有資料的情況——使用stream流的map
//像這種,需要構建另一個陣列的,相當於一個執行緒執行完了,會有返回值
//使用stream流的map + CompletableFuture.supplyAsync()
@Test
public void test09() throws Exception {
//先獲取資料,需要處理的任務。
List<UserCs> users = this.getUserCs();
//莫法處理任務
List<CompletableFuture<UserCs>> futures = users.stream()
.map(user -> CompletableFuture.supplyAsync(() -> {
// 處理資料
user.setName(user.getName() + "-改");
log.info("列印-改:{}", user.getName());
// 其他的業務邏輯。。。
return user;
}, myIoThreadPool)).collect(Collectors.toList());
//獲取futures
List<UserCs> endList = futures.stream()
//阻塞所有執行緒
.map(CompletableFuture::join)
//取age大於10的使用者
.filter(user -> user.getAge() > 10)
//按照age升序排序
.sorted(Comparator.comparing(UserCs::getAge))
.collect(Collectors.toList());
log.info("列印:都執行完了。。。{}", endList);
}
//CompletableFuture 例外處理
@Test
public void test10() throws Exception {
//先獲取資料,需要處理的任務。
List<UserCs> users = this.getUserCs();
//莫法處理任務
List<CompletableFuture<UserCs>> futures = users.stream()
.map(user -> CompletableFuture.supplyAsync(() -> {
if (user.getAge() > 5){
int a = 1/0;
}
// 處理資料
user.setName(user.getName() + "-改");
log.info("列印-改:{}", user.getName());
// 其他的業務邏輯。。。
return user;
}, myIoThreadPool)
//處理異常方式1:返回預設值或者一個替代的 Future 物件,從而避免系統的崩潰或例外處理的問題。
.exceptionally(throwable -> {
//可以直接獲取user
System.out.println("異常了:" + user);
//處理異常的方法……
//1還可以進行業務處理……比如將異常資料存起來,然後匯出……
//2返回預設值,如:user、null
//return user;
//3丟擲異常
throw new RuntimeException(throwable.getMessage());
})
//處理異常方式2:類似exceptionally(不推薦)
// .handle((userCs, throwable) -> {
// System.out.println("handle:" + user);
// if (throwable != null) {
// // 處理異常
// log.error("處理使用者資訊出現異常,使用者名稱為:" + user.getName(), throwable);
// // 返回原始資料
// return userCs;
// } else {
// // 返回正常資料
// return userCs;
// }
// })
)
.collect(Collectors.toList());
//獲取futures
List<UserCs> endList = futures.stream()
//阻塞所有執行緒
.map(CompletableFuture::join)
//取age大於10的使用者
.filter(user -> user.getAge() > 10)
//按照age升序排序
.sorted(Comparator.comparing(UserCs::getAge))
.collect(Collectors.toList());
log.info("列印:都執行完了。。。{}", endList);
}
1、推薦使用:test03、test05、test09、test10、test11
2、test07、test08就是test09的前身。
test01:獲取當前電腦(伺服器)的cpu核數
test02:執行緒池原始的使用(不推薦直接這樣用)
test03:開啟非同步1 —— @Async
test04:開啟非同步2 —— CompletableFuture.runAsync()
test05:開啟非同步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync() —— 阻塞所有非同步方法,一起提交
相當於開了3個執行緒去執行三個不同的方法,然後執行完後一起提交。
test052:開啟非同步2的改造 —— 第一個任務執行完了,獲取到返回值,給後面的執行,可以連寫,也可以單寫。 —— 阻塞執行緒:get、join
test06:CompletableFuture開啟多執行緒——無返回值的
test07:CompletableFuture開啟多執行緒——無返回值的——構建一個新List
1、相當於多執行緒執行任務,然後把結果插入到List中
2、接收多執行緒的List必須是執行緒安全的,ArrayList執行緒不安全
執行緒安全的List —— CopyOnWriteArrayList 替代 ArrayList
test08:CompletableFuture開啟多執行緒——無返回值的——構建一個新List——先有資料的情況(基本和test07是一個方法)
test09:CompletableFuture開啟多執行緒——有返回值的,返回一個新的List——先有資料的情況——使用stream流的map
test10:CompletableFuture 例外處理。相當於是 test09的增強,處理異常
test11:CompletableFuture 例外處理:如果出現異常就捨棄任務。
1、想了一下,出現異常後的任務確實沒有執行下去了,任務不往下執行,怎麼會發現異常呢?
2、發現了異常任務也就完了。而且列印了異常,相當於返回了異常。
3、未發生異常的任務會執行完成。如果發生異常都返回空,最後捨棄空的,就得到任務執行成功的 CompletableFuture
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓所有方式↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
package com.cc.md;
import com.cc.md.entity.UserCs;
import com.cc.md.service.IAsyncService;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@SpringBootTest
class Test01 {
private static final Logger log = LoggerFactory.getLogger(Test01.class);
@Resource(name = "myIoThreadPool")
private ThreadPoolTaskExecutor myIoThreadPool;
/**
* 非同步類
*/
@Resource
private IAsyncService asyncService;
@Test
void test01() {
//獲取當前jdk能呼叫的CPU個數(當前伺服器的處理器個數)
int i = Runtime.getRuntime().availableProcessors();
System.out.println(i);
}
//執行緒池原始的使用
@Test
void test02() {
try {
for (int i = 0; i < 1000; i++) {
int finalI = i;
myIoThreadPool.submit(() -> {
//第一批建立的執行緒數
log.info("列印:{}", finalI);
//模仿io流耗時
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}catch(Exception e){
throw new RuntimeException(e);
}finally {
myIoThreadPool.shutdown();
}
}
//開啟非同步1 —— @Async
@Test
public void test03() throws Exception {
log.info("列印:{}", "非同步測試的-主方法1");
asyncService.async1();
asyncService.async2();
//不會等待非同步方法執行,直接返回前端資料
log.info("列印:{}", "非同步測試的-主方法2");
}
//開啟非同步2 —— CompletableFuture.runAsync()
@Test
public void test04() throws Exception {
log.info("列印:{}", "非同步測試的-主方法1");
CompletableFuture.runAsync(() -> {
log.info("列印:{}", "非同步方法1!");
//非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
this.async2("非同步方法1!-end");
}, myIoThreadPool);
//不會等待非同步方法執行,直接返回前端資料
log.info("列印:{}", "非同步測試的-主方法2");
}
//非同步需要執行的方法,可以寫在同一個類中。
private void async2(String msg) {
//模仿io流耗時
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("列印:{}", msg);
}
//開啟非同步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync() —— 阻塞所有非同步方法,一起提交
//相當於開了3個執行緒去執行三個不同的方法,然後執行完後一起提交。
@Test
public void test05() throws Exception {
log.info("列印:{}", "非同步測試的-主方法1");
//非同步執行1
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
log.info("列印:{}", "非同步方法1!");
//非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
this.async2("非同步方法1-end");
return "非同步方法1";
}, myIoThreadPool);
//非同步執行2
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
log.info("列印:{}", "非同步方法2!");
//非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
this.async2("非同步方法2-end");
return "非同步方法2";
}, myIoThreadPool);
//非同步執行3,不用我們自己的執行緒池 —— 用的就是系統自帶的 ForkJoinPool 執行緒池
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
log.info("列印:{}", "非同步方法3!");
//非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
this.async2("非同步方法3-end");
});
//阻塞所有非同步方法,一起提交後才走下面的程式碼
CompletableFuture.allOf(future1, future2, future3).join();
log.info("列印:{}", "非同步-阻塞-測試的-主方法2-end");
}
//開啟非同步2的改造 —— 第一個任務執行完了,獲取到返回值,給後面的執行,可以連寫,也可以單寫。 —— 阻塞執行緒:get、join
// CompletableFuture 的 get 和 join 方法區別:
// get:①可以獲取執行緒中的異常、②設定等待時間
// join:推薦在 CompletableFuture 中使用 join() 方法,因為它沒有受到 interrupt 的干擾,不需要捕獲異常,也不需要強制型別轉換。
@Test
public void test052() throws Exception {
log.info("列印:{}", "非同步測試的-主方法1");
//非同步執行1
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
log.info("列印:{}", "非同步方法1!");
// 非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
String str = "非同步方法1-end";
this.async2(str);
return str;
}, myIoThreadPool);
// 非同步執行2 - 無返回值 —— 分開寫的方式
CompletableFuture<Void> future2 = future1.thenAccept(str1 -> {
log.info("列印:{}", "非同步方法2!");
// 非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
this.async2(String.format("%s-加-非同步方法2! - 無返回值 - ",str1));
});
// 非同步執行3 - 有返回值 —— 分開寫future1,連寫future3方式
CompletableFuture<String> future3 = future1.thenApply(str2 -> {
log.info("列印:{}", "非同步方法3!");
// 非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
this.async2(String.format("%s-加-非同步方法3! - 有返回值 - ", str2));
return "非同步執行3 - 有返回值 ";
//連寫的方式。
}).thenApply(str3 -> {
String format = String.format("%s- end", str3);
log.error("非同步3然後應用 - {}", format);
//返回後面的應用
return format;
});
// 獲取future3的返回值:
//如果需要捕獲異常、設定等待超時時間,則用get
log.info("future3的返回值(不阻塞):{}", future3.get());
// log.info("future3的返回值(不阻塞-設定等待時間,超時報錯:TimeoutException):{}",
// future3.get(2, TimeUnit.SECONDS));
//推薦使用 join方法
// log.info("future3的返回值(阻塞):{}", future3.join());
//阻塞所有非同步方法,一起提交後才走下面的程式碼
CompletableFuture.allOf(future1, future2).join();
log.info("列印:{}", "非同步-阻塞-測試的-主方法2-end");
}
//CompletableFuture開啟多執行緒——無返回值的
@Test
public void test06() throws Exception {
List<CompletableFuture<Void>> futures = new ArrayList<>();
//迴圈,模仿很多工
for (int i = 0; i < 1000; i++) {
int finalI = i;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
//第一批建立的執行緒數
log.info("列印:{}", finalI);
//模仿io流耗時
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, myIoThreadPool);
futures.add(future);
}
//阻塞:多執行緒的任務執行。相當於多執行緒執行完了,再執行後面的程式碼
//如果不阻塞,上面的相當於非同步執行了。
//阻塞方式1:可以獲取返回的異常、設定等待時間
// futures.forEach(future -> {
// try {
// future.get();
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// });
//阻塞方式2(推薦)
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
log.info("列印:都執行完了。。。");
}
//CompletableFuture開啟多執行緒——無返回值的——構建一個新List
//相當於多執行緒執行任務,然後把結果插入到List中
//接收多執行緒的List必須是執行緒安全的,ArrayList執行緒不安全
//執行緒安全的List —— CopyOnWriteArrayList 替代 ArrayList
@Test
public void test07() throws Exception {
List<CompletableFuture<Void>> futures = new ArrayList<>();
//存資料的List
List<UserCs> addList = new CopyOnWriteArrayList<>();
//迴圈,模仿很多工
for (int i = 0; i < 1000; i++) {
int finalI = i;
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
log.info("列印:{}", finalI);
UserCs userCs = new UserCs();
userCs.setName(String.format("姓名-%s", finalI));
userCs.setAge(finalI);
addList.add(userCs);
}, myIoThreadPool);
futures.add(future);
}
//阻塞
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
//返回新的List:endList,取age大於10的使用者
List<UserCs> endList = addList.stream()
.filter(user -> user.getAge() > 10)
//按照age升序排序
.sorted(Comparator.comparing(UserCs::getAge))
.collect(Collectors.toList());
log.info("列印:都執行完了。。。{}", endList);
}
//CompletableFuture開啟多執行緒——無返回值的——構建一個新List——先有資料的情況
//用CopyOnWriteArrayList 替代 ArrayList接收
@Test
public void test08() throws Exception {
//先獲取資料,需要處理的任務。
List<UserCs> users = this.getUserCs();
//開啟多執行緒
List<CompletableFuture<Void>> futures = new ArrayList<>();
//存資料的List
List<UserCs> addList = new CopyOnWriteArrayList<>();
//莫法處理任務
users.forEach(user -> {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
//新增資料
user.setName(user.getName() + "-改");
addList.add(user);
log.info("列印-改:{}", user.getName());
//其他的業務邏輯。。。
}, myIoThreadPool);
futures.add(future);
});
//阻塞
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
//返回新的List:endList
List<UserCs> endList = addList.stream()
.filter(user -> user.getAge() > 10)
//按照age升序排序
.sorted(Comparator.comparing(UserCs::getAge))
.collect(Collectors.toList());
log.info("列印:都執行完了。。。{}", endList);
}
//CompletableFuture開啟多執行緒——有返回值的,返回一個新的List——先有資料的情況——使用stream流的map
//像這種,需要構建另一個陣列的,相當於一個執行緒執行完了,會有返回值
//使用stream流的map + CompletableFuture.supplyAsync()
@Test
public void test09() throws Exception {
//先獲取資料,需要處理的任務。
List<UserCs> users = this.getUserCs();
//莫法處理任務
List<CompletableFuture<UserCs>> futures = users.stream()
.map(user -> CompletableFuture.supplyAsync(() -> {
// 處理資料
user.setName(user.getName() + "-改");
log.info("列印-改:{}", user.getName());
// 其他的業務邏輯。。。
return user;
}, myIoThreadPool)).collect(Collectors.toList());
//獲取futures
List<UserCs> endList = futures.stream()
//阻塞所有執行緒
.map(CompletableFuture::join)
//取age大於10的使用者
.filter(user -> user.getAge() > 10)
//按照age升序排序
.sorted(Comparator.comparing(UserCs::getAge))
.collect(Collectors.toList());
log.info("列印:都執行完了。。。{}", endList);
}
//基礎資料
private List<UserCs> getUserCs() {
List<UserCs> users = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
UserCs userCs = new UserCs();
userCs.setName(String.format("姓名-%s", i));
userCs.setAge(i);
users.add(userCs);
}
return users;
}
//CompletableFuture 例外處理
@Test
public void test10() throws Exception {
//先獲取資料,需要處理的任務。
List<UserCs> users = this.getUserCs();
//莫法處理任務
List<CompletableFuture<UserCs>> futures = users.stream()
.map(user -> CompletableFuture.supplyAsync(() -> {
if (user.getAge() > 5){
int a = 1/0;
}
// 處理資料
user.setName(user.getName() + "-改");
log.info("列印-改:{}", user.getName());
// 其他的業務邏輯。。。
return user;
}, myIoThreadPool)
//處理異常方式1:返回預設值或者一個替代的 Future 物件,從而避免系統的崩潰或例外處理的問題。
.exceptionally(throwable -> {
//可以直接獲取user
System.out.println("異常了:" + user);
//處理異常的方法……
//1還可以進行業務處理……比如將異常資料存起來,然後匯出……
//2返回預設值,如:user、null
//return user;
//3丟擲異常
throw new RuntimeException(throwable.getMessage());
})
//處理異常方式2:類似exceptionally(不推薦)
// .handle((userCs, throwable) -> {
// System.out.println("handle:" + user);
// if (throwable != null) {
// // 處理異常
// log.error("處理使用者資訊出現異常,使用者名稱為:" + user.getName(), throwable);
// // 返回原始資料
// return userCs;
// } else {
// // 返回正常資料
// return userCs;
// }
// })
)
.collect(Collectors.toList());
//獲取futures
List<UserCs> endList = futures.stream()
//阻塞所有執行緒
.map(CompletableFuture::join)
//取age大於10的使用者
.filter(user -> user.getAge() > 10)
//按照age升序排序
.sorted(Comparator.comparing(UserCs::getAge))
.collect(Collectors.toList());
log.info("列印:都執行完了。。。{}", endList);
}
//CompletableFuture 例外處理:如果出現異常就捨棄任務。
// 想了一下,出現異常後的任務確實沒有執行下去了,任務不往下執行,怎麼會發現異常呢?
// 發現了異常任務也就完了。而且列印了異常,相當於返回了異常。
// 未發生異常的任務會執行完成。如果發生異常都返回空,最後捨棄空的,就得到任務執行成功的 CompletableFuture
@Test
public void test11() {
List<UserCs> users = getUserCs();
List<CompletableFuture<UserCs>> futures = users.stream()
.map(user -> CompletableFuture.supplyAsync(() -> {
if (user.getAge() > 15) {
int a = 1 / 0;
}
user.setName(user.getName() + "-改");
log.info("列印-改:{}", user.getName());
return user;
}, myIoThreadPool)
//處理異常
.exceptionally(throwable -> {
//其他處理異常的邏輯
return null;
})
)
//捨棄返回的物件是null的 CompletableFuture
.filter(e -> Objects.nonNull(e.join())).collect(Collectors.toList());
//獲取futures
List<UserCs> endList = futures.stream()
//阻塞所有執行緒
.map(CompletableFuture::join)
//取age大於10的使用者
.filter(user -> user.getAge() > 10)
//按照age升序排序
.sorted(Comparator.comparing(UserCs::getAge))
.collect(Collectors.toList());
log.info("列印:都執行完了。。。{}", endList);
}
}