Java的CompletableFuture,Java的多執行緒開發

2023-05-25 15:00:27

三、Java8的CompletableFuture,Java的多執行緒開發

1、CompletableFuture的常用方法

  • 以後用到再加
runAsync() :開啟非同步(建立執行緒執行任務),無返回值
supplyAsync() :開啟非同步(建立執行緒執行任務),有返回值
thenApply() :然後應用,適用於有返回值的結果,拿著返回值再去處理。
exceptionally():用於處理非同步任務執行過程中出現異常的情況的一個方法:返回預設值或者一個替代的 CompletableFuture 物件,從而避免系統的崩潰或例外處理的問題。
handle():類似exceptionally()


get()  :阻塞執行緒:主要可以: ①獲取執行緒中的異常然後處理異常、②設定等待時間
join() :阻塞執行緒:推薦使用  join()  方法,因為它沒有受到 interrupt 的干擾,不需要捕獲異常,也不需要強制型別轉換。他自己會丟擲異常。


CompletableFuture.allOf()
CompletableFuture.anyOf()
  • get() 和 join() 方法區別?
    • 都可以阻塞執行緒 —— 等所有任務都執行完了再執行後續程式碼。
CompletableFuture 中的  get()  和  join()  方法都用於獲取非同步任務的執行結果,但是在使用時需要注意以下幾點區別: 
 
1. 丟擲異常的方式不同:如果非同步任務執行過程中出現異常, get()  方法會丟擲 ExecutionException 異常,而  join()  方法會丟擲 CompletionException 異常,這兩個異常都是繼承自 RuntimeException 的。 
 
2. 方法呼叫限制不同: join()  方法是不可以被中斷的,一旦呼叫就必須等待任務執行完成才能返回結果;而  get()  方法可以在呼叫時設定等待的超時時間,如果超時還沒有獲取到結果,就會丟擲 TimeoutException 異常。 
 
3. 返回結果型別不同: get()  方法返回的是非同步任務的執行結果,該結果是泛型型別 T 的,需要強制轉換才能獲取真正的結果;而  join()  方法返回的是非同步任務的執行結果,該結果是泛型型別 T,不需要強制轉換。 
 
4. 推薦使用方式不同:推薦在 CompletableFuture 中使用  join()  方法,因為它沒有受到 interrupt 的干擾,不需要捕獲異常,也不需要強制型別轉換。 
 
綜上所述, get()  方法和  join()  方法都是獲取非同步任務的執行結果,但是在使用時需要根據具體場景選擇使用哪個方法。如果需要獲取執行結果並且不希望被中斷,推薦使用  join()  方法;如果需要控制等待時間或者需要捕獲異常,則可以使用  get()  方法。
  • anyOf() 和 allOf() 的區別?
CompletableFuture 是 Java 8 引入的一個強大的非同步程式設計工具,它支援鏈式呼叫、組合和轉換非同步操作等功能。其中,anyOf 和 allOf 都是 CompletableFuture 的兩個常用方法,它們的區別如下: 
 
1. anyOf:任意一個 CompletableFuture 完成,它就會跟隨這個 CompletableFuture 的結果完成,返回第一個完成的 CompletableFuture 的結果。 
 
2. allOf:所有的 CompletableFuture 都完成時,它才會跟隨它們的結果完成,返回一個空的 CompletableFuture。 
 
簡而言之,anyOf 和 allOf 的最大區別是:anyOf 任意一個 CompletableFuture 完成就跟著它的結果完成,而 allOf 所有的 CompletableFuture 完成才可以完成,並返回一個空的 CompletableFuture。 
 
舉例來說,如果有三個 CompletableFuture:f1、f2、f3,其中 f1 和 f2 可能會返回一個字串,而 f3 可能會返回一個整數,那麼: 
 
- anyOf(f1, f2, f3) 的結果是 f1、f2、f3 中任意一個 CompletableFuture 的結果; 
- allOf(f1, f2, f3) 的結果是一個空的 CompletableFuture,它的完成狀態表示 f1、f2、f3 是否全部完成。 
 
總之,anyOf 和 allOf 在實際使用中可以根據不同的需求來選擇,它們都是 CompletableFuture 中非常強大的組合操作。

2、使用CompletableFuture

2.1、實體類準備

package com.cc.md.entity;

import lombok.Data;

/**
 * @author CC
 * @since 2023/5/24 0024
 */
@Data
public class UserCs {

    private String name;

    private Integer age;

}

2.2、常用方式

  • 無返回值推薦:開啟多執行緒——無返回值的——阻塞:test06
    @Resource(name = "myIoThreadPool")
    private ThreadPoolTaskExecutor myIoThreadPool;
    
    //CompletableFuture開啟多執行緒——無返回值的
    @Test
    public void test06() throws Exception {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //迴圈,模仿很多工
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                //第一批建立的執行緒數
                log.info("列印:{}", finalI);
                //模仿io流耗時
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }, myIoThreadPool);
            futures.add(future);
        }
        //阻塞:多執行緒的任務執行。相當於多執行緒執行完了,再執行後面的程式碼
        //如果不阻塞,上面的相當於非同步執行了。
        //阻塞方式1:可以獲取返回的異常、設定等待時間
//        futures.forEach(future -> {
//            try {
//                future.get();
//            } catch (Exception e) {
//                throw new RuntimeException(e);
//            }
//        });
        //阻塞方式2(推薦)
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
        log.info("列印:都執行完了。。。");
    }
  • 有返回值推薦:開啟多執行緒——有返回值的,返回一個新的List——阻塞——使用stream流的map:test09
    • test07、test08 可以轉化為 test09 (現在這個)
    • 可以返回任務型別的值,不一定要返回下面的user物件。
    @Resource(name = "myIoThreadPool")
    private ThreadPoolTaskExecutor myIoThreadPool;
    
    //CompletableFuture開啟多執行緒——有返回值的,返回一個新的List——先有資料的情況——使用stream流的map
    //像這種,需要構建另一個陣列的,相當於一個執行緒執行完了,會有返回值
    //使用stream流的map + CompletableFuture.supplyAsync()
    @Test
    public void test09() throws Exception {
        //先獲取資料,需要處理的任務。
        List<UserCs> users = this.getUserCs();
        //莫法處理任務
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                    // 處理資料
                    user.setName(user.getName() + "-改");
                    log.info("列印-改:{}", user.getName());
                    // 其他的業務邏輯。。。

                    return user;
                }, myIoThreadPool)).collect(Collectors.toList());

        //獲取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有執行緒
                .map(CompletableFuture::join)
                //取age大於10的使用者
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

2.3、例外處理

  • exceptionally
  • handle
	//CompletableFuture 例外處理
    @Test
    public void test10() throws Exception {
        //先獲取資料,需要處理的任務。
        List<UserCs> users = this.getUserCs();
        //莫法處理任務
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                        if (user.getAge() > 5){
                            int a = 1/0;
                        }
                        // 處理資料
                        user.setName(user.getName() + "-改");
                        log.info("列印-改:{}", user.getName());
                        // 其他的業務邏輯。。。

                        return user;
                    }, myIoThreadPool)
                    //處理異常方式1:返回預設值或者一個替代的 Future 物件,從而避免系統的崩潰或例外處理的問題。
                    .exceptionally(throwable -> {
                        //可以直接獲取user
                        System.out.println("異常了:" + user);
                        //處理異常的方法……
                        //1還可以進行業務處理……比如將異常資料存起來,然後匯出……
                        //2返回預設值,如:user、null
                        //return user;
                        //3丟擲異常
                        throw new RuntimeException(throwable.getMessage());
                    })
                    //處理異常方式2:類似exceptionally(不推薦)
//                    .handle((userCs, throwable) -> {
//                        System.out.println("handle:" + user);
//                        if (throwable != null) {
//                            // 處理異常
//                            log.error("處理使用者資訊出現異常,使用者名稱為:" + user.getName(), throwable);
//                            // 返回原始資料
//                            return userCs;
//                        } else {
//                            // 返回正常資料
//                            return userCs;
//                        }
//                    })
                )
                .collect(Collectors.toList());

        //獲取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有執行緒
                .map(CompletableFuture::join)
                //取age大於10的使用者
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

2.4、CompletableFuture的使用測試

1、推薦使用:test03、test05、test09、test10、test11

2、test07、test08就是test09的前身。


  • test01:獲取當前電腦(伺服器)的cpu核數

  • test02:執行緒池原始的使用(不推薦直接這樣用)

  • test03:開啟非同步1 —— @Async

  • test04:開啟非同步2 —— CompletableFuture.runAsync()

  • test05:開啟非同步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync() —— 阻塞所有非同步方法,一起提交

    • 相當於開了3個執行緒去執行三個不同的方法,然後執行完後一起提交。
      
  • test052:開啟非同步2的改造 —— 第一個任務執行完了,獲取到返回值,給後面的執行,可以連寫,也可以單寫。 —— 阻塞執行緒:get、join

  • test06:CompletableFuture開啟多執行緒——無返回值的

  • test07:CompletableFuture開啟多執行緒——無返回值的——構建一個新List

    • 1、相當於多執行緒執行任務,然後把結果插入到List中
      2、接收多執行緒的List必須是執行緒安全的,ArrayList執行緒不安全
         執行緒安全的List —— CopyOnWriteArrayList 替代 ArrayList
      
  • test08:CompletableFuture開啟多執行緒——無返回值的——構建一個新List——先有資料的情況(基本和test07是一個方法)

  • test09:CompletableFuture開啟多執行緒——有返回值的,返回一個新的List——先有資料的情況——使用stream流的map

  • test10:CompletableFuture 例外處理。相當於是 test09的增強,處理異常

  • test11:CompletableFuture 例外處理:如果出現異常就捨棄任務

    • 1、想了一下,出現異常後的任務確實沒有執行下去了,任務不往下執行,怎麼會發現異常呢?
      2、發現了異常任務也就完了。而且列印了異常,相當於返回了異常。
      3、未發生異常的任務會執行完成。如果發生異常都返回空,最後捨棄空的,就得到任務執行成功的 CompletableFuture
      

↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓所有方式↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓


package com.cc.md;

import com.cc.md.entity.UserCs;
import com.cc.md.service.IAsyncService;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@SpringBootTest
class Test01 {

    private static final Logger log = LoggerFactory.getLogger(Test01.class);

    @Resource(name = "myIoThreadPool")
    private ThreadPoolTaskExecutor myIoThreadPool;
    /**
     * 非同步類
     */
    @Resource
    private IAsyncService asyncService;

    @Test
    void test01() {
        //獲取當前jdk能呼叫的CPU個數(當前伺服器的處理器個數)
        int i = Runtime.getRuntime().availableProcessors();
        System.out.println(i);
    }

    //執行緒池原始的使用
    @Test
    void test02() {
        try {
            for (int i = 0; i < 1000; i++) {
                int finalI = i;
                myIoThreadPool.submit(() -> {
                    //第一批建立的執行緒數
                    log.info("列印:{}", finalI);
                    //模仿io流耗時
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        }catch(Exception e){
            throw new RuntimeException(e);
        }finally {
            myIoThreadPool.shutdown();
        }
    }

    //開啟非同步1 —— @Async
    @Test
    public void test03() throws Exception {
        log.info("列印:{}", "非同步測試的-主方法1");
        asyncService.async1();
        asyncService.async2();
        //不會等待非同步方法執行,直接返回前端資料
        log.info("列印:{}", "非同步測試的-主方法2");
    }

    //開啟非同步2 —— CompletableFuture.runAsync()
    @Test
    public void test04() throws Exception {
        log.info("列印:{}", "非同步測試的-主方法1");
        CompletableFuture.runAsync(() -> {
            log.info("列印:{}", "非同步方法1!");
            //非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2("非同步方法1!-end");
        }, myIoThreadPool);
        //不會等待非同步方法執行,直接返回前端資料
        log.info("列印:{}", "非同步測試的-主方法2");
    }

    //非同步需要執行的方法,可以寫在同一個類中。
    private void async2(String msg) {
        //模仿io流耗時
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("列印:{}", msg);
    }

    //開啟非同步2的改造 —— CompletableFuture.runAsync() 和 supplyAsync()  —— 阻塞所有非同步方法,一起提交
    //相當於開了3個執行緒去執行三個不同的方法,然後執行完後一起提交。
    @Test
    public void test05() throws Exception {
        log.info("列印:{}", "非同步測試的-主方法1");
        //非同步執行1
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            log.info("列印:{}", "非同步方法1!");
            //非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2("非同步方法1-end");
            return "非同步方法1";
        }, myIoThreadPool);

        //非同步執行2
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            log.info("列印:{}", "非同步方法2!");
            //非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2("非同步方法2-end");
            return "非同步方法2";
        }, myIoThreadPool);

        //非同步執行3,不用我們自己的執行緒池 —— 用的就是系統自帶的 ForkJoinPool 執行緒池
        CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
            log.info("列印:{}", "非同步方法3!");
            //非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2("非同步方法3-end");
        });

        //阻塞所有非同步方法,一起提交後才走下面的程式碼
        CompletableFuture.allOf(future1, future2, future3).join();

        log.info("列印:{}", "非同步-阻塞-測試的-主方法2-end");
    }

    //開啟非同步2的改造 —— 第一個任務執行完了,獲取到返回值,給後面的執行,可以連寫,也可以單寫。 —— 阻塞執行緒:get、join
    // CompletableFuture 的 get 和 join 方法區別:
    // get:①可以獲取執行緒中的異常、②設定等待時間
    // join:推薦在 CompletableFuture 中使用  join()  方法,因為它沒有受到 interrupt 的干擾,不需要捕獲異常,也不需要強制型別轉換。
    @Test
    public void test052() throws Exception {
        log.info("列印:{}", "非同步測試的-主方法1");
        //非同步執行1
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            log.info("列印:{}", "非同步方法1!");
            // 非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
            String str = "非同步方法1-end";
            this.async2(str);
            return str;
        }, myIoThreadPool);

        // 非同步執行2 - 無返回值 —— 分開寫的方式
        CompletableFuture<Void> future2 = future1.thenAccept(str1 -> {
            log.info("列印:{}", "非同步方法2!");
            // 非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2(String.format("%s-加-非同步方法2! - 無返回值 - ",str1));
        });

        // 非同步執行3 - 有返回值 —— 分開寫future1,連寫future3方式
        CompletableFuture<String> future3 = future1.thenApply(str2 -> {
            log.info("列印:{}", "非同步方法3!");
            // 非同步執行的程式碼,也可以是方法,該方法不用單獨寫到其他類中。
            this.async2(String.format("%s-加-非同步方法3! - 有返回值 - ", str2));
            return "非同步執行3 - 有返回值 ";

            //連寫的方式。
        }).thenApply(str3 -> {
            String format = String.format("%s- end", str3);
            log.error("非同步3然後應用 - {}", format);
            //返回後面的應用
            return format;
        });
        // 獲取future3的返回值:
        //如果需要捕獲異常、設定等待超時時間,則用get
        log.info("future3的返回值(不阻塞):{}", future3.get());
//        log.info("future3的返回值(不阻塞-設定等待時間,超時報錯:TimeoutException):{}",
//                future3.get(2, TimeUnit.SECONDS));
        //推薦使用 join方法
//        log.info("future3的返回值(阻塞):{}", future3.join());

        //阻塞所有非同步方法,一起提交後才走下面的程式碼
        CompletableFuture.allOf(future1, future2).join();

        log.info("列印:{}", "非同步-阻塞-測試的-主方法2-end");
    }

    //CompletableFuture開啟多執行緒——無返回值的
    @Test
    public void test06() throws Exception {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //迴圈,模仿很多工
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                //第一批建立的執行緒數
                log.info("列印:{}", finalI);
                //模仿io流耗時
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }, myIoThreadPool);
            futures.add(future);
        }
        //阻塞:多執行緒的任務執行。相當於多執行緒執行完了,再執行後面的程式碼
        //如果不阻塞,上面的相當於非同步執行了。
        //阻塞方式1:可以獲取返回的異常、設定等待時間
//        futures.forEach(future -> {
//            try {
//                future.get();
//            } catch (Exception e) {
//                throw new RuntimeException(e);
//            }
//        });
        //阻塞方式2(推薦)
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
        log.info("列印:都執行完了。。。");
    }

    //CompletableFuture開啟多執行緒——無返回值的——構建一個新List
    //相當於多執行緒執行任務,然後把結果插入到List中
    //接收多執行緒的List必須是執行緒安全的,ArrayList執行緒不安全
    //執行緒安全的List —— CopyOnWriteArrayList 替代 ArrayList
    @Test
    public void test07() throws Exception {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //存資料的List
        List<UserCs> addList = new CopyOnWriteArrayList<>();
        //迴圈,模仿很多工
        for (int i = 0; i < 1000; i++) {
            int finalI = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                log.info("列印:{}", finalI);
                UserCs userCs = new UserCs();
                userCs.setName(String.format("姓名-%s", finalI));
                userCs.setAge(finalI);
                addList.add(userCs);
            }, myIoThreadPool);
            futures.add(future);
        }
        //阻塞
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

        //返回新的List:endList,取age大於10的使用者
        List<UserCs> endList = addList.stream()
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

    //CompletableFuture開啟多執行緒——無返回值的——構建一個新List——先有資料的情況
    //用CopyOnWriteArrayList 替代 ArrayList接收
    @Test
    public void test08() throws Exception {
        //先獲取資料,需要處理的任務。
        List<UserCs> users = this.getUserCs();
        //開啟多執行緒
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        //存資料的List
        List<UserCs> addList = new CopyOnWriteArrayList<>();
        //莫法處理任務
        users.forEach(user -> {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                //新增資料
                user.setName(user.getName() + "-改");
                addList.add(user);

                log.info("列印-改:{}", user.getName());
                //其他的業務邏輯。。。

            }, myIoThreadPool);
            futures.add(future);
        });

        //阻塞
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();

        //返回新的List:endList
        List<UserCs> endList = addList.stream()
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

    //CompletableFuture開啟多執行緒——有返回值的,返回一個新的List——先有資料的情況——使用stream流的map
    //像這種,需要構建另一個陣列的,相當於一個執行緒執行完了,會有返回值
    //使用stream流的map + CompletableFuture.supplyAsync()
    @Test
    public void test09() throws Exception {
        //先獲取資料,需要處理的任務。
        List<UserCs> users = this.getUserCs();
        //莫法處理任務
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                    // 處理資料
                    user.setName(user.getName() + "-改");
                    log.info("列印-改:{}", user.getName());
                    // 其他的業務邏輯。。。

                    return user;
                }, myIoThreadPool)).collect(Collectors.toList());

        //獲取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有執行緒
                .map(CompletableFuture::join)
                //取age大於10的使用者
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

    //基礎資料
    private List<UserCs> getUserCs() {
        List<UserCs> users = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            UserCs userCs = new UserCs();
            userCs.setName(String.format("姓名-%s", i));
            userCs.setAge(i);
            users.add(userCs);
        }
        return users;
    }

    //CompletableFuture 例外處理
    @Test
    public void test10() throws Exception {
        //先獲取資料,需要處理的任務。
        List<UserCs> users = this.getUserCs();
        //莫法處理任務
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                        if (user.getAge() > 5){
                            int a = 1/0;
                        }
                        // 處理資料
                        user.setName(user.getName() + "-改");
                        log.info("列印-改:{}", user.getName());
                        // 其他的業務邏輯。。。

                        return user;
                    }, myIoThreadPool)
                    //處理異常方式1:返回預設值或者一個替代的 Future 物件,從而避免系統的崩潰或例外處理的問題。
                    .exceptionally(throwable -> {
                        //可以直接獲取user
                        System.out.println("異常了:" + user);
                        //處理異常的方法……
                        //1還可以進行業務處理……比如將異常資料存起來,然後匯出……
                        //2返回預設值,如:user、null
                        //return user;
                        //3丟擲異常
                        throw new RuntimeException(throwable.getMessage());
                    })
                    //處理異常方式2:類似exceptionally(不推薦)
//                    .handle((userCs, throwable) -> {
//                        System.out.println("handle:" + user);
//                        if (throwable != null) {
//                            // 處理異常
//                            log.error("處理使用者資訊出現異常,使用者名稱為:" + user.getName(), throwable);
//                            // 返回原始資料
//                            return userCs;
//                        } else {
//                            // 返回正常資料
//                            return userCs;
//                        }
//                    })
                )
                .collect(Collectors.toList());

        //獲取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有執行緒
                .map(CompletableFuture::join)
                //取age大於10的使用者
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);
    }

    //CompletableFuture 例外處理:如果出現異常就捨棄任務。
    // 想了一下,出現異常後的任務確實沒有執行下去了,任務不往下執行,怎麼會發現異常呢?
    // 發現了異常任務也就完了。而且列印了異常,相當於返回了異常。
    // 未發生異常的任務會執行完成。如果發生異常都返回空,最後捨棄空的,就得到任務執行成功的 CompletableFuture
    @Test
    public void test11() {
        List<UserCs> users = getUserCs();
        List<CompletableFuture<UserCs>> futures = users.stream()
                .map(user -> CompletableFuture.supplyAsync(() -> {
                            if (user.getAge() > 15) {
                                int a = 1 / 0;
                            }
                            user.setName(user.getName() + "-改");
                            log.info("列印-改:{}", user.getName());
                            return user;
                        }, myIoThreadPool)
                        //處理異常
                        .exceptionally(throwable -> {
                            //其他處理異常的邏輯

                            return null;
                        })
                )
                //捨棄返回的物件是null的 CompletableFuture
                .filter(e -> Objects.nonNull(e.join())).collect(Collectors.toList());

        //獲取futures
        List<UserCs> endList = futures.stream()
                //阻塞所有執行緒
                .map(CompletableFuture::join)
                //取age大於10的使用者
                .filter(user -> user.getAge() > 10)
                //按照age升序排序
                .sorted(Comparator.comparing(UserCs::getAge))
                .collect(Collectors.toList());
        log.info("列印:都執行完了。。。{}", endList);

    }

}