Java 中的資料流和函數語言程式設計

2020-02-06 00:25:00

學習如何使用 Java 8 中的流 API 和函數語言程式設計結構。

當 Java SE 8(又名核心 Java 8)在 2014 年被推出時,它引入了一些更改,從根本上影響了用它進行的程式設計。這些更改中有兩個緊密相連的部分:流 API 和函數語言程式設計構造。本文使用程式碼範例,從基礎到高階特性,介紹每個部分並說明它們之間的相互作用。

基礎特性

流 API 是在資料序列中迭代元素的簡潔而高階的方法。包 java.util.streamjava.util.function 包含了用於流 API 和相關函數語言程式設計構造的新庫。當然,程式碼範例勝過千言萬語。

下面的程式碼段用大約 2,000 個隨機整數值填充了一個 List

Random rand = new Random2();List<Integer> list = new ArrayList<Integer>();           // 空 listfor (int i = 0; i < 2048; i++) list.add(rand.nextInt()); // 填充它

另外用一個 for 迴圈可用於遍歷填充列表,以將偶數值收集到另一個列表中。

流 API 提供了一種更簡潔的方法來執行此操作:

List <Integer> evens = list    .stream()                      // 流化 list    .filter(n -> (n & 0x1) == 0)   // 過濾出奇數值    .collect(Collectors.toList()); // 收集偶數值

這個例子有三個來自流 API 的函數:

  • stream 函數可以將集合轉換為流,而流是一個每次可存取一個值的傳送帶。流化是惰性的(因此也是高效的),因為值是根據需要產生的,而不是一次性產生的。
  • filter 函數確定哪些流的值(如果有的話)通過了處理管道中的下一個階段,即 collect 階段。filter 函數是 高階的higher-order,因為它的引數是一個函數 —— 在這個例子中是一個 lambda 表示式,它是一個未命名的函數,並且是 Java 新的函數語言程式設計結構的核心。

lambda 語法與傳統的 Java 完全不同:

n -> (n & 0x1) == 0

箭頭(一個減號後面緊跟著一個大於號)將左邊的參數列與右邊的函數體分隔開。引數 n 雖未明確型別,但也可以明確。在任何情況下,編譯器都會發現 n 是個 Integer。如果有多個引數,這些引數將被括在括號中,並用逗號分隔。

在本例中,函數體檢查一個整數的最低位(最右)是否為零,這用來表示偶數。過濾器應返回一個布林值。儘管可以,但該函數的主體中沒有顯式的 return。如果主體沒有顯式的 return,則主體的最後一個表示式即是返回值。在這個例子中,主體按照 lambda 程式設計的思想編寫,由一個簡單的布林表示式 (n & 0x1) == 0 組成。

  • collect 函數將偶數值收集到參照為 evens 的列表中。如下例所示,collect 函數是執行緒安全的,因此,即使在多個執行緒之間共用了過濾操作,該函數也可以正常工作。

方便的功能和輕鬆實現多執行緒

在生產環境中,資料流的源可能是檔案或網路連線。為了學習流 API, Java 提供了諸如 IntStream 這樣的型別,它可以用各種型別的元素生成流。這裡有一個 IntStream 的例子:

IntStream                          // 整型流    .range(1, 2048)                // 生成此範圍內的整型流    .parallel()                    // 為多個執行緒分割區資料    .filter(i -> ((i & 0x1) > 0))  // 奇偶校驗 - 只允許奇數通過    .forEach(System.out::println); // 列印每個值

IntStream 型別包括一個 range 函數,該函數在指定的範圍內生成一個整數值流,在本例中,以 1 為增量,從 1 遞增到 2048。parallel 函數自動劃分該工作到多個執行緒中,在各個執行緒中進行過濾和列印。(執行緒數通常與主機系統上的 CPU 數量匹配。)函數 forEach 引數是一個方法參照,在本例中是對封裝在 System.out 中的 println 方法的參照,方法輸出型別為 PrintStream。方法和構造器參照的語法將在稍後討論。

由於具有多執行緒,因此整數值整體上以任意順序列印,但在給定執行緒中是按順序列印的。例如,如果執行緒 T1 列印 409 和 411,那麼 T1 將按照順序 409-411 列印,但是其它某個執行緒可能會預先列印 2045。parallel 呼叫後面的執行緒是並行執行的,因此它們的輸出順序是不確定的。

map/reduce 模式

map/reduce 模式在處理大型資料集方面變得很流行。一個 map/reduce 宏操作由兩個微操作構成。首先,將資料分散(對映mapped)到各個工作程式中,然後將單獨的結果收集在一起 —— 也可能收集統計起來成為一個值,即歸約reduction。歸約可以採用不同的形式,如以下範例所示。

下面 Number 類的範例用 EVENODD 表示有奇偶校驗的整數值:

public class Number {    enum Parity { EVEN, ODD }    private int value;    public Number(int n) { setValue(n); }    public void setValue(int value) { this.value = value; }    public int getValue() { return this.value; }    public Parity getParity() {        return ((value & 0x1) == 0) ? Parity.EVEN : Parity.ODD;    }    public void dump() {        System.out.format("Value: %2d (parity: %s)\n", getValue(),                          (getParity() == Parity.ODD ? "odd" : "even"));    }}

下面的程式碼演示了用 Number 流進行 map/reduce 的情形,從而表明流 API 不僅可以處理 intfloat 等基本型別,還可以處理程式設計師自定義的類型別。

在下面的程式碼段中,使用了 parallelStream 而不是 stream 函數對隨機整數值列表進行流化處理。與前面介紹的 parallel 函數一樣,parallelStream 變體也可以自動執行多執行緒。

final int howMany = 200;Random r = new Random();Number[] nums = new Number[howMany];for (int i = 0; i < howMany; i++) nums[i] = new Number(r.nextInt(100));List<Number> listOfNums = Arrays.asList(nums);  // 將陣列轉化為 listInteger sum4All = listOfNums    .parallelStream()           // 自動執行多執行緒    .mapToInt(Number::getValue) // 使用方法參照,而不是 lambda    .sum();                     // 將流值計算出和值System.out.println("The sum of the randomly generated values is: " + sum4All);

高階的 mapToInt 函數可以接受一個 lambda 作為引數,但在本例中,它接受一個方法參照,即 Number::getValuegetValue 方法不需要引數,它返回給定的 Number 範例的 int 值。語法並不複雜:類名 Number 後跟一個雙冒號和方法名。回想一下先前的例子 System.out::println,它在 System 類中的 static 屬性 out 後面有一個雙冒號。

方法參照 Number::getValue 可以用下面的 lambda 表示式替換。引數 n 是流中的 Number 範例中的之一:

mapToInt(n -> n.getValue())

通常,lambda 表示式和方法參照是可互換的:如果像 mapToInt 這樣的高階函數可以採用一種形式作為引數,那麼這個函數也可以採用另一種形式。這兩個函數語言程式設計結構具有相同的目的 —— 對作為引數傳入的資料執行一些自定義操作。在兩者之間進行選擇通常是為了方便。例如,lambda 可以在沒有封裝類的情況下編寫,而方法則不能。我的習慣是使用 lambda,除非已經有了適當的封裝方法。

當前範例末尾的 sum 函數通過結合來自 parallelStream 執行緒的部分和,以執行緒安全的方式進行歸約。但是,程式設計師有責任確保在 parallelStream 呼叫引發的多執行緒過程中,程式設計師自己的函數呼叫(在本例中為 getValue)是執行緒安全的。

最後一點值得強調。lambda 語法鼓勵編寫純函數pure function,即函數的返回值僅取決於傳入的引數(如果有);純函數沒有副作用,例如更新一個類中的 static 欄位。因此,純函數是執行緒安全的,並且如果傳遞給高階函數的函數引數(例如 filtermap )是純函數,則流 API 效果最佳。

對於更細粒度的控制,有另一個流 API 函數,名為 reduce,可用於對 Number 流中的值求和:

Integer sum4AllHarder = listOfNums    .parallelStream()                           // 多執行緒    .map(Number::getValue)                      // 每個 Number 的值    .reduce(0, (sofar, next) -> sofar + next);  // 求和

此版本的 reduce 函數帶有兩個引數,第二個引數是一個函數:

  • 第一個引數(在這種情況下為零)是特徵值,該值用作求和操作的初始值,並且在求和過程中流結束時用作預設值。
  • 第二個引數是累加器,在本例中,這個 lambda 表示式有兩個引數:第一個引數(sofar)是正在執行的和,第二個引數(next)是來自流的下一個值。執行的和以及下一個值相加,然後更新累加器。請記住,由於開始時呼叫了 parallelStream,因此 mapreduce 函數現在都在多執行緒上下文中執行。

在到目前為止的範例中,流值被收集,然後被規約,但是,通常情況下,流 API 中的 Collectors 可以累積值,而不需要將它們規約到單個值。正如下一個程式碼段所示,收集活動可以生成任意豐富的資料結構。該範例使用與前面範例相同的 listOfNums

Map<Number.Parity, List<Number>> numMap = listOfNums    .parallelStream()    .collect(Collectors.groupingBy(Number::getParity));List<Number> evens = numMap.get(Number.Parity.EVEN);List<Number> odds = numMap.get(Number.Parity.ODD);

第一行中的 numMap 指的是一個 Map,它的鍵是一個 Number 奇偶校驗位(ODDEVEN),其值是一個具有指定奇偶校驗位值的 Number 範例的 List。同樣,通過 parallelStream 呼叫進行多執行緒處理,然後 collect 呼叫(以執行緒安全的方式)將部分結果組裝到 numMap 參照的 Map 中。然後,在 numMap 上呼叫 get 方法兩次,一次獲取 evens,第二次獲取 odds

實用函數 dumpList 再次使用來自流 API 的高階 forEach 函數:

private void dumpList(String msg, List<Number> list) {    System.out.println("\n" + msg);    list.stream().forEach(n -> n.dump()); // 或者使用 forEach(Number::dump)}

這是範例執行中程式輸出的一部分:

The sum of the randomly generated values is: 3322The sum again, using a different method:     3322Evens:Value: 72 (parity: even)Value: 54 (parity: even)...Value: 92 (parity: even)Odds:Value: 35 (parity: odd)Value: 37 (parity: odd)...Value: 41 (parity: odd)

用於程式碼簡化的函數式結構

函數式結構(如方法參照和 lambda 表示式)非常適合在流 API 中使用。這些構造代表了 Java 中對高階函數的主要簡化。即使在糟糕的過去,Java 也通過 MethodConstructor 型別在技術上支援高階函數,這些型別的範例可以作為引數傳遞給其它函數。由於其複雜性,這些型別在生產級 Java 中很少使用。例如,呼叫 Method 需要物件參照(如果方法是非靜態的)或至少一個類識別符號(如果方法是靜態的)。然後,被呼叫的 Method 的引數作為物件範例傳遞給它,如果沒有發生多型(那會出現另一種複雜性!),則可能需要顯式向下轉換。相比之下,lambda 和方法參照很容易作為引數傳遞給其它函數。

但是,新的函數式結構在流 API 之外具有其它用途。考慮一個 Java GUI 程式,該程式帶有一個供使用者按下的按鈕,例如,按下以獲取當前時間。按鈕按下的事件處理程式可能編寫如下:

JButton updateCurrentTime = new JButton("Update current time");updateCurrentTime.addActionListener(new ActionListener() {    @Override    public void actionPerformed(ActionEvent e) {        currentTime.setText(new Date().toString());    }});

這個簡短的程式碼段很難解釋。關注第二行,其中方法 addActionListener 的引數開始如下:

new ActionListener() {

這似乎是錯誤的,因為 ActionListener 是一個抽象介面,而抽象型別不能通過呼叫 new 範例化。但是,事實證明,還有其它一些範例被範例化了:一個實現此介面的未命名內部類。如果上面的程式碼封裝在名為 OldJava 的類中,則該未命名的內部類將被編譯為 OldJava$1.classactionPerformed 方法在這個未命名的內部類中被重寫。

現在考慮使用新的函數式結構進行這個令人耳目一新的更改:

updateCurrentTime.addActionListener(e -> currentTime.setText(new Date().toString()));

lambda 表示式中的引數 e 是一個 ActionEvent 範例,而 lambda 的主體是對按鈕上的 setText 的簡單呼叫。

函數式介面和函陣列合

到目前為止,使用的 lambda 已經寫好了。但是,為了方便起見,我們可以像參照封裝方法一樣參照 lambda 表示式。以下一系列簡短範例說明了這一點。

考慮以下介面定義:

@FunctionalInterface // 可選,通常省略interface BinaryIntOp {    abstract int compute(int arg1, int arg2); // abstract 宣告可以被刪除}

注釋 @FunctionalInterface 適用於宣告唯一抽象方法的任何介面;在本例中,這個抽象介面是 compute。一些標準介面,(例如具有唯一宣告方法 runRunnable 介面)同樣符合這個要求。在此範例中,compute 是已宣告的方法。該介面可用作參照宣告中的目標型別:

BinaryIntOp div = (arg1, arg2) -> arg1 / arg2;div.compute(12, 3); // 4

java.util.function 提供各種函數式介面。以下是一些範例。

下面的程式碼段介紹了引數化的 Predicate 函數式介面。在此範例中,帶有引數 StringPredicate<String> 型別可以參照具有 String 引數的 lambda 表示式或諸如 isEmpty 之類的 String 方法。通常情況下,Predicate 是一個返回布林值的函數。

Predicate<String> pred = String::isEmpty; // String 方法的 predicate 宣告String[] strings = {"one", "two", "", "three", "four"};Arrays.asList(strings)   .stream()   .filter(pred)                  // 過濾掉非空字串   .forEach(System.out::println); // 只列印空字串

在字串長度為零的情況下,isEmpty Predicate 判定結果為 true。 因此,只有空字串才能進入管道的 forEach 階段。

下一段程式碼將演示如何將簡單的 lambda 或方法參照組合成更豐富的 lambda 或方法參照。考慮這一系列對 IntUnaryOperator 型別的參照的賦值,它接受一個整型引數並返回一個整型值:

IntUnaryOperator doubled = n -> n * 2;IntUnaryOperator tripled = n -> n * 3;IntUnaryOperator squared = n -> n * n;

IntUnaryOperator 是一個 FunctionalInterface,其唯一宣告的方法為 applyAsInt。現在可以單獨使用或以各種組合形式使用這三個參照 doubledtripledsquared

int arg = 5;doubled.applyAsInt(arg); // 10tripled.applyAsInt(arg); // 15squared.applyAsInt(arg); // 25

以下是一些函陣列合的樣例:

int arg = 5;doubled.compose(squared).applyAsInt(arg); // 5 求 2 次方後乘 2:50tripled.compose(doubled).applyAsInt(arg); // 5 乘 2 後再乘 3:30doubled.andThen(squared).applyAsInt(arg); // 5 乘 2 後求 2 次方:100squared.andThen(tripled).applyAsInt(arg); // 5 求 2 次方後乘 3:75

函陣列合可以直接使用 lambda 表示式實現,但是參照使程式碼更簡潔。

構造器參照

構造器參照是另一種函數語言程式設計構造,而這些參照在比 lambda 和方法參照更微妙的上下文中非常有用。再一次重申,程式碼範例似乎是最好的解釋方式。

考慮這個 POJO 類:

public class BedRocker { // 基岩的居民    private String name;    public BedRocker(String name) { this.name = name; }    public String getName() { return this.name; }    public void dump() { System.out.println(getName()); }}

該類只有一個建構函式,它需要一個 String 引數。給定一個名字陣列,目標是生成一個 BedRocker 元素陣列,每個名字代表一個元素。下面是使用了函數式結構的程式碼段:

String[] names = {"Fred", "Wilma", "Peebles", "Dino", "Baby Puss"};Stream<BedRocker> bedrockers = Arrays.asList(names).stream().map(BedRocker::new);BedRocker[] arrayBR = bedrockers.toArray(BedRocker[]::new);Arrays.asList(arrayBR).stream().forEach(BedRocker::dump);

在較高的層次上,這個程式碼段將名字轉換為 BedRocker 陣列元素。具體來說,程式碼如下所示。Stream 介面(在包 java.util.stream 中)可以被引數化,而在本例中,生成了一個名為 bedrockersBedRocker 流。

Arrays.asList 實用程式再次用於流化一個陣列 names,然後將流的每一項傳遞給 map 函數,該函數的引數現在是構造器參照 BedRocker::new。這個構造器參照通過在每次呼叫時生成和初始化一個 BedRocker 範例來充當一個物件工廠。在第二行執行之後,名為 bedrockers 的流由五項 BedRocker 組成。

這個例子可以通過關注高階 map 函數來進一步闡明。在通常情況下,一個對映將一個型別的值(例如,一個 int)轉換為另一個相同型別的值(例如,一個整數的後繼):

map(n -> n + 1) // 將 n 對映到其後繼

然而,在 BedRocker 這個例子中,轉換更加戲劇化,因為一個型別的值(代表一個名字的 String)被對映到一個不同型別的值,在這個例子中,就是一個 BedRocker 範例,這個字串就是它的名字。轉換是通過一個構造器呼叫來完成的,它是由構造器參照來實現的:

map(BedRocker::new) // 將 String 對映到 BedRocker

傳遞給構造器的值是 names 陣列中的其中一項。

此程式碼範例的第二行還演示了一個你目前已經非常熟悉的轉換:先將陣列先轉換成 List,然後再轉換成 Stream

Stream<BedRocker> bedrockers = Arrays.asList(names).stream().map(BedRocker::new);

第三行則是另一種方式 —— 流 bedrockers 通過使用陣列構造器參照 BedRocker[]::new 呼叫 toArray 方法:

BedRocker[ ] arrayBR = bedrockers.toArray(BedRocker[]::new);

該構造器參照不會建立單個 BedRocker 範例,而是建立這些範例的整個陣列:該構造器參照現在為 BedRocker[]:new,而不是 BedRocker::new。為了進行確認,將 arrayBR 轉換為 List,再次對其進行流式處理,以便可以使用 forEach 來列印 BedRocker 的名字。

FredWilmaPeeblesDinoBaby Puss

該範例對資料結構的微妙轉換僅用幾行程式碼即可完成,從而突出了可以將 lambda,方法參照或構造器參照作為引數的各種高階函數的功能。

柯里化Currying

柯里化函數是指減少函數執行任何工作所需的顯式引數的數量(通常減少到一個)。(該術語是為了紀念邏輯學家 Haskell Curry。)一般來說,函數的引數越少,呼叫起來就越容易,也更健壯。(回想一下一些需要半打左右引數的噩夢般的函數!)因此,應將柯里化視為簡化函數呼叫的一種嘗試。java.util.function 包中的介面型別適合於柯里化,如以下範例所示。

參照的 IntBinaryOperator 介面型別是為函數接受兩個整型引數,並返回一個整型值:

IntBinaryOperator mult2 = (n1, n2) -> n1 * n2;mult2.applyAsInt(10, 20); // 200mult2.applyAsInt(10, 30); // 300

參照 mult2 強調了需要兩個顯式引數,在本例中是 10 和 20。

前面介紹的 IntUnaryOperatorIntBinaryOperator 簡單,因為前者只需要一個引數,而後者則需要兩個引數。兩者均返回整數值。因此,目標是將名為 mult2 的兩個引數 IntBinraryOperator 柯里化成一個單一的 IntUnaryOperator 版本 curriedMult2

考慮 IntFunction<R> 型別。此型別的函數採用整型引數,並返回型別為 R 的結果,該結果可以是另一個函數 —— 更準確地說,是 IntBinaryOperator。讓一個 lambda 返回另一個 lambda 很簡單:

arg1 -> (arg2 -> arg1 * arg2) // 括號可以省略

完整的 lambda 以 arg1 開頭,而該 lambda 的主體以及返回的值是另一個以 arg2 開頭的 lambda。返回的 lambda 僅接受一個引數(arg2),但返回了兩個數位的乘積(arg1arg2)。下面的概述,再加上程式碼,應該可以更好地進行說明。

以下是如何柯里化 mult2 的概述:

  • 型別為 IntFunction<IntUnaryOperator> 的 lambda 被寫入並呼叫,其整型值為 10。返回的 IntUnaryOperator 快取了值 10,因此變成了已柯里化版本的 mult2,在本例中為 curriedMult2
  • 然後使用單個顯式引數(例如,20)呼叫 curriedMult2 函數,該引數與快取的引數(在本例中為 10)相乘以生成返回的乘積。。

這是程式碼的詳細資訊:

// 建立一個接受一個引數 n1 並返回一個單引數 n2 -> n1 * n2 的函數,該函數返回一個(n1 * n2 乘積的)整型數。IntFunction<IntUnaryOperator> curriedMult2Maker = n1 -> (n2 -> n1 * n2);

呼叫 curriedMult2Maker 生成所需的 IntUnaryOperator 函數:

// 使用 curriedMult2Maker 獲取已柯里化版本的 mult2。// 引數 10 是上面的 lambda 的 n1。IntUnaryOperator curriedMult2 = curriedMult2Maker2.apply(10);

10 現在快取在 curriedMult2 函數中,以便 curriedMult2 呼叫中的顯式整型引數乘以 10:

curriedMult2.applyAsInt(20); // 200 = 10 * 20curriedMult2.applyAsInt(80); // 800 = 10 * 80

快取的值可以隨意更改:

curriedMult2 = curriedMult2Maker.apply(50); // 快取 50curriedMult2.applyAsInt(101);               // 5050 = 101 * 50

當然,可以通過這種方式建立多個已柯里化版本的 mult2,每個版本都有一個 IntUnaryOperator

柯里化充分利用了 lambda 的強大功能:可以很容易地編寫 lambda 表示式來返回需要的任何型別的值,包括另一個 lambda。

總結

Java 仍然是基於類的物件導向的程式語言。但是,借助流 API 及其支援的函數式構造,Java 向函數式語言(例如 Lisp)邁出了決定性的(同時也是受歡迎的)一步。結果是 Java 更適合處理現代程式設計中常見的海量資料流。在函數式方向上的這一步還使以在前面的程式碼範例中突出顯示的管道的方式編寫清晰簡潔的 Java 程式碼更加容易:

dataStream   .parallelStream() // 多執行緒以提高效率   .filter(...)      // 階段 1   .map(...)         // 階段 2   .filter(...)      // 階段 3   ...   .collect(...);    // 或者,也可以進行歸約:階段 N

自動多執行緒,以 parallelparallelStream 呼叫為例,建立在 Java 的 fork/join 框架上,該框架支援 任務竊取task stealing 以提高效率。假設 parallelStream 呼叫後面的執行緒池由八個執行緒組成,並且 dataStream 被八種方式分割區。某個執行緒(例如,T1)可能比另一個執行緒(例如,T7)工作更快,這意味著應該將 T7 的某些任務移到 T1 的工作佇列中。這會在執行時自動發生。

在這個簡單的多執行緒世界中,程式設計師的主要職責是編寫執行緒安全函數,這些函數作為引數傳遞給在流 API 中占主導地位的高階函數。尤其是 lambda 鼓勵編寫純函數(因此是執行緒安全的)函數。