響應式程式設計——初識 Flux 和 Mono

2023-09-07 13:28:44

by emanjusaka from ​ https://www.emanjusaka.top/archives/4 彼岸花開可奈何
本文歡迎分享與聚合,全文轉載請留下原文地址。

前言

Reactor 是一個響應式程式設計的基礎類庫,其中有兩個很關鍵的類:Flux 和 Mono。掌握這兩個類和相關概念有助於我們學習響應式程式設計。

Flux 和 Mono 都是資料流的釋出者,使用 Flux 和 Mono 都可以發出三種資料訊號:元素值,錯誤訊號,完成訊號;錯誤訊號和完成訊號都代表終止訊號,終止訊號用於告訴訂閱者資料流結束了,錯誤訊號終止資料流同時把錯誤資訊傳遞給訂閱者。

一、Flux

具有 rx 運運算元的響應式流釋出器,發出 0 到 N 個元素,然後完成(成功或有錯誤)。

下圖顯示了 Flux 如何轉換專案:

Flux是一個標準的Publisher,表示一個非同步的0到N個發出的專案序列,可選擇終止於完成訊號或錯誤訊號。根據Reactive Streams規範,這三種型別的訊號轉換為對下游Subscriber的onNext、onComplete和onError方法的呼叫。
由於可能出現的訊號範圍很大,Flux是通用的響應式型別。請注意,所有事件,包括終止事件,都是可選的:沒有onNext事件但有onComplete事件表示一個空的有限序列,但如果去掉onComplete,則得到一個無限的空序列(除了用於取消測試之外,不是特別有用)。同樣,無限序列不一定為空。例如,Flux.interval(Duration)會生成一個無限的Flux,從時鐘發出定期的滴答聲。Flux 是標準的 Publisher,它表示 0 到 N 個發出項的非同步序列,可以選擇由完成訊號或錯誤終止。與 Reactive Streams 規範中一樣,這三種型別的訊號轉換為對下游訂閱者的 onNext、onComplete 和 onError 方法的呼叫。

憑藉如此大範圍的可能訊號,Flux 是通用的無功型別。請注意,所有事件,甚至終止事件,都是可選的:沒有 onNext 事件,但 onComplete 事件表示一個空的有限序列,但刪除 onComplete 並且您有一個無限的空序列(不是特別有用,除了圍繞取消的測試)。同樣,無限序列不一定是空的。例如, Flux.interval(Duration) 生成無限的 Flux 並從時鐘發出規則的滴答聲。

二、Mono

具有基本 rx 運運算元的 Reactive Streams Publisher 通過 onNext 訊號最多發出一項,然後以 onComplete 訊號終止(成功的 Mono,有或沒有值),或者僅發出單個 onError 訊號(失敗的 Mono)。

下圖顯示了 Mono 如何轉換專案:

Mono是一種特殊的Publisher,通過onNext訊號發出最多一個專案,然後通過onComplete訊號終止(成功的Mono,有或沒有值),或者只發出一個onError訊號(失敗的Mono)。
大多數Mono實現在呼叫onNext後立即呼叫其Subscriber的onComplete。Mono.never()是一個例外:它不發出任何訊號,在技術上並不禁止,但在測試之外沒有太大用處。另一方面,明確禁止使用onNext和onError的組合。
Mono只提供了Flux可用的操作符的子集,而某些操作符(特別是將Mono與另一個Publisher組合的操作符)會切換到Flux。例如,Mono#concatWith(Publisher)返回一個Flux,而Monothen(Mono)返回另一個Mono。
請注意,您可以使用Mono來表示只有完成概念的無值非同步過程(類似於Runnable)。要建立一個,您可以使用一個空的Mono

三、程式碼範例

  1. 建立一個Flux,發出一系列字串元素並訂閱列印出來:

    package top.emanjusaka;
    import reactor.core.publisher.Flux;
    public class Main {
        public static void main(String[] args) {
            Flux<String> flux = Flux.just("Hello", "emanjusaka", "!");
            flux.subscribe(System.out::println);
        }
    }
    
    // 輸出
    Hello
    emanjusaka
    !
    
  2. 建立一個Mono,發出一個字串元素並訂閱列印出來:

    package top.emanjusaka;
    import reactor.core.publisher.Mono;
    public class Main {
        public static void main(String[] args) {
            Mono<String> mono = Mono.just("Hello");
            mono.subscribe(System.out::println);
        }
    }
    
    // 輸出
    Hello
    
  3. 使用Flux的操作符進行元素轉換和過濾:

    package top.emanjusaka;
    
    import reactor.core.publisher.Flux;
    
    public class Main {
        public static void main(String[] args) {
            Flux<Integer> numbers = Flux.range(1, 10);
            numbers.map(num -> num * 2)
                    .filter(num -> num % 3 == 0)
                    .subscribe(System.out::println);
        }
    }
    
    // 輸出
    6
    12
    18
    
  4. 使用Mono的操作符進行元素轉換和錯誤處理:

    package top.emanjusaka;
    
    import reactor.core.publisher.Mono;
    
    public class Main {
        public static void main(String[] args) {
            Mono<Integer> number = Mono.just(5);
            number.map(num -> num * 2)
                    .doOnError(Throwable::printStackTrace)
                    .subscribe(System.out::println);
        }
    }
    
    
    // 輸出
    10
    

四、總結

Flux 和 Mono 都是位於 reactor.core.publisher包下的類。

Reactor中的Flux和Mono是用於實現響應式程式設計的兩種基本型別:

  1. Flux:表示一個非同步序列,可以發出0到N個專案。它可以終止於完成訊號或錯誤訊號。Flux適用於處理多個專案的情況,可以使用各種操作符來處理和轉換序列。
  2. Mono:表示一個非同步序列,最多發出一個專案。它要麼終止於完成訊號(有或沒有值),要麼只發出一個錯誤訊號。Mono適用於處理單個專案的情況,也可以使用一些操作符來處理和轉換序列。

這兩種型別都是Publisher的實現,遵循Reactive Streams規範,並可以與其他響應式庫和框架進行互操作。

Flux和Mono都可以表示無限序列,也可以表示空序列。它們提供了豐富的操作符來處理和轉換序列,例如對映、過濾、合併、扁平化等。此外,它們還支援非同步和並行處理,可以與其他操作符和操作進行組合使用。

總的來說,Flux適用於處理多個專案的情況,而Mono適用於處理單個專案的情況。它們是Reactor中用於實現響應式程式設計的基本型別,提供了豐富的操作符和功能來處理和轉換非同步序列。

五、參考文獻

  1. 《Reactor》參考檔案

本文原創,才疏學淺,如有紕漏,歡迎指正。尊貴的朋友,如果本文對您有所幫助,歡迎點贊,並期待您的反饋,以便於不斷優化。

原文地址: https://www.emanjusaka.top/archives/4

微信公眾號:emanjusaka的程式設計棧