雲原生時代頂流訊息中介軟體Apache Pulsar部署實操之輕量級計算框架

2023-03-07 21:00:55

@

Pulsar Functions(輕量級計算框架)

基礎定義

Function instance(函數範例)是函數執行框架的核心元素,由以下元素組成:

  • 消費來自不同輸入主題的訊息的消費者的集合。
  • 呼叫函數的執行程式。
  • 將函數的結果傳送到輸出主題的生產者。

函數範例的內部工作流

  • 一個函數可以有多個範例,每個範例執行一個函數的副本。可以在組態檔中指定範例數。
  • 函數範例中的使用者使用FQFN作為訂閱者名,以基於訂閱型別在多個範例之間實現負載平衡。訂閱型別可以在函數級別指定。
  • 每個函數都有一個單獨的FQFN狀態儲存。可指定一個狀態介面,以便在BookKeeper中持久化中間結果。其他使用者可以查詢函數的狀態並提取這些結果。

工作流程

Function worker 是一個邏輯元件,用於在Pulsar Functions的叢集模式部署中監視、編排和執行單個函數。每個函數範例都可以作為執行緒或程序執行,具體取決於所選的設定。如果Kubernetes叢集可用,則可以在Kubernetes中以StatefulSets的形式生成函數。

Function worker的內部架構和工作流如下

  • 使用者向REST伺服器傳送請求以執行函數範例。
  • REST伺服器響應請求並將請求傳遞給功能後設資料管理器。
  • 函數後設資料管理器將請求更新寫入函數後設資料主題。並跟蹤所有與後設資料相關的訊息,並使用函數後設資料主題持久化函數的狀態更新。
  • 函數後設資料管理器從函數後設資料主題讀取更新,並觸發排程管理器計算分配。
  • 日程管理器將作業更新寫入作業主題。
  • 函數執行時管理器偵聽分配主題,讀取分配更新,並更新其內部狀態,該狀態包含所有工作人員的所有分配的全域性檢視;如果更新更改了工作物件上的賦值,函數執行時管理器將通過啟動或停止函數範例的執行來具體化新的賦值。
  • 會員管理要求協調主題選舉一個領導工作者,所有工作人員都訂閱故障轉移訂閱中的協調主題,但活動的工作人員成為領導者並執行分配,從而保證該主題只有一個活動消費者。
  • 成員管理器從協調主題讀取更新。

函數執行時

函數範例是在執行時內呼叫的,許多範例可以並行執行。Pulsar支援三種不同成本和隔離保證的函數執行時型別,以最大限度地提高部署靈活性。可以根據需要使用其中之一來執行函數。

  • 執行緒執行時:每個範例都作為一個執行緒執行。由於執行緒模式的程式碼是用Java編寫的,所以它只適用於Java範例。當函數以執行緒模式執行時,它與函數工作者執行在同一個Java虛擬機器器(JVM)上。
  • 程序執行時:每個範例都作為一個程序執行。當函數以程序模式執行時,它執行在函數工作者執行的同一臺機器上。
  • Kubernetes執行時:函數由worker以Kubernetes StatefulSet的形式提交,每個函數範例作為pod執行。Pulsar支援在啟動函數時向Kubernetes StatefulSets和服務新增標籤,這有助於選擇目標Kubernetes物件。

處理保證和訂閱型別

Pulsar提供了三種不同的訊息傳遞語意,可以將它們應用於一個函數。根據ack時間節點確定不同的傳遞語意實現。

  • At-most-once:最多一次,傳送到函數的每個訊息都將盡最大努力處理。不能保證訊息是否會被處理。當選擇此語意時,autoAck設定必須設定為true,否則啟動將失敗(autoAck設定將在將來的版本中棄用)。Ack時間節點:函數處理之前。
  • At-least-once:最少一次,傳送到函數的每個訊息都可以被處理多次(以防處理失敗或重新交付)。如果建立函數時沒有指定——processing- guaranteed標誌,則該函數提供至少一次交付保證。Ack時間節點:傳送訊息到輸出後。
  • Effectively-once:精確一次,送到函數的每條訊息都可以被處理多次,但它只有一個輸出。重複的訊息將被忽略。有效地在至少一次處理和有保證的伺服器端重複資料刪除的基礎上實現一次。這意味著一個狀態更新可以發生兩次,但是相同的狀態更新只應用一次,另一個重複的狀態更新在伺服器端被丟棄。Ack時間節點:傳送訊息到輸出後。
  • Manual:當選擇這個語意時,框架不會執行任何ack操作,需要在函數中呼叫context.getCurrentRecord().ack()方法來手動執行ack操作。Ack時間節點:在函數方法中自定義。

提示

  • 預設情況下,Pulsar函數提供至少一次交付保證。如果建立函數時沒有為——processing guaranteed標誌提供值,則該函數提供至少一次保證。
  • 排他訂閱型別在Pulsar函數中不可用,原因:
    • 如果只有一個範例,exclusive等於故障轉移。
    • 如果有多個範例,exclusive可能會在函數重新啟動時崩潰並重新啟動。在這種情況下,排他不等於故障轉移。因為當主消費者斷開連線時,所有未確認的和後續的訊息都被傳遞到下一個。
  • 要將訂閱型別從shared更改為key_shared,可以在pulse -admin中使用- retain-key- ordered選項。

可以在建立函數時設定函數的處理保證。如下面的命令建立了一個應用了「精確一次」保證的函數。

bin/pulsar-admin functions create \
  --name my-effectively-once-function \
  --processing-guarantees EFFECTIVELY_ONCE \

可以使用update命令更改應用於函數的處理保證。

bin/pulsar-admin functions update \
  --processing-guarantees ATMOST_ONCE \

視窗函數

定義

目前,視窗函數僅在Java中可用,並且不支援MANUAL和effective -once delivery語意。視窗函數是跨資料視窗(即事件流的有限子集)執行計算的函數。如下圖所示,流被劃分為「桶」,其中可以應用函數。

函數的資料視窗定義包含兩個策略:

  • 清除策略:控制在視窗中收集的資料量。
  • 觸發策略:控制何時觸發一個函數並執行該函數以根據清除策略處理視窗中收集的所有資料。

觸發策略和驅逐策略都由時間或計數驅動。

提示

  • 同時支援處理時間和事件時間。
  • 處理時間是根據函數範例構建和處理視窗時的壁時間定義的。視窗完整性的判斷很簡單,您不必擔心資料到達混亂。

視窗型別

捲動視窗

捲動視窗將元素分配給具有指定時間長度或計數的視窗。捲動視窗的驅逐策略總是基於視窗已滿。因此只需要指定觸發器策略,基於計數或基於時間。在具有基於計數的觸發策略的捲動視窗中,如以下範例所示,觸發策略被設定為2。當視窗中有兩個專案時,無論時間如何,都會觸發並執行每個函數。

相反,如下面的範例所示,捲動視窗的視窗長度為10秒,這意味著當10秒的時間間隔過去時,函數將被觸發,而不管視窗中有多少事件。

滑動視窗

滑動視窗方法通過設定清除策略來限制保留用於處理的資料量,並使用滑動間隔設定觸發器策略來定義固定的視窗長度。如果滑動間隔小於視窗長度,則存在資料重疊,這意味著同時落入相鄰視窗的資料將被多次用於計算。如下面的範例所示,視窗長度為2秒,這意味著任何超過2秒的資料都將被清除,不會在計算中使用。滑動間隔被設定為1秒,這意味著該函數每秒執行一次,以處理整個視窗長度內的資料。

函數設定

在獨立的Pulsar中建立和驗證函數(包括有狀態函數和視窗函數)的分步說明和範例

  • 在conf/broker.conf檔案(對於Pulsar standalone, conf/standalone.conf)中,將functionsWorkerEnabled設定為true。vim conf/broker.conf
functionsWorkerEnabled=true

如果是standalone Pulsar 在conf/standalone.conf檔案中增加上面的欄位。

  • 重啟broker
bin/pulsar-daemon stop broker
bin/pulsar-daemon start broker
  • 檢查Pulsar Function叢集
bin/pulsar-admin functions-worker get-cluster

函數範例

使用官方的函數範例演示,檢視根目錄下examples資料夾

  • 建立租戶和名稱空間
bin/pulsar-admin tenants create my-test
bin/pulsar-admin namespaces create my-test/my-namespace
bin/pulsar-admin namespaces list my-test
  • 修改 vim examples/example-function-config.yaml
tenant: "my-test"
namespace: "my-namespace"
name: "example"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs: ["persistent://my-test/my-namespace/test_src"]
userConfig:
  "PublishTopic": "persistent://my-test/my-namespace/test_result"

output: "persistent://my-test/my-namespace/test_result"
autoAck: true
parallelism: 1
  • 建立函數
bin/pulsar-admin functions create \
   --function-config-file examples/example-function-config.yaml \
   --jar examples/api-examples.jar

  • 檢視函數的設定
bin/pulsar-admin functions get \
   --tenant my-test \
   --namespace my-namespace \
   --name example

  • 檢視狀態
bin/pulsar-admin functions status \
   --tenant my-test \
   --namespace my-namespace \
   --name example

  • 消費訊息
bin/pulsar-client consume persistent://my-test/my-namespace/test_result -s 'my-subscription' -p Earliest -n 0
  • 生產訊息
bin/pulsar-client produce persistent://my-test/my-namespace/test_src --messages "test-messages-`date`" -n 10
  • 檢視消費者的輸出

有狀態函數範例

  • 在BookKeeper中啟用streamStorage服務。目前服務使用的是NAR包,需要在conf/bookkeeper.conf檔案中進行設定。vim conf/bookkeeper.conf
### Grpc Server ###
#
## the grpc server port to listen on. default is 4181
storageserver.grpc.port=4181
#
#### Dlog Settings for table service ###
#
##### Replication Settings
dlog.bkcEnsembleSize=3
dlog.bkcWriteQuorumSize=2
dlog.bkcAckQuorumSize=2
#
#### Storage ###
#
## local storage directories for storing table ranges data (e.g. rocksdb sst files)
storage.range.store.dirs=data/bookkeeper/ranges
#
## whether the storage server capable of serving readonly tables. default is false.
storage.serve.readonly.tables=false
#
## the cluster controller schedule interval, in milliseconds. default is 30 seconds.
storage.cluster.controller.schedule.interval.ms=30000
  • 建立vim examples/example-stateful-function-config.yaml
tenant: "my-test"
namespace: "my-namespace"
name: "word_count"
className: "org.apache.pulsar.functions.api.examples.WordCountFunction"
inputs: ["persistent://my-test/my-namespace/wordcount_src"] # this function will read messages from these topics
autoAck: true
parallelism: 1
  • 建立函數
bin/pulsar-admin functions create \
   --function-config-file examples/example-stateful-function-config.yaml \
   --jar examples/api-examples.jar
  • 查詢帶有itxs鍵的函數的狀態表。該操作監視與itxs相關的更改。
bin/pulsar-admin functions querystate \
   --tenant my-test \
   --namespace my-namespace \
   --name word_count -k itxs -w
  • 消費訊息
bin/pulsar-client consume persistent://my-test/my-namespace/wordcount_result -s 'my-subscription' -p Earliest -n 0

bin/pulsar-client consume test_wordcount_dest -s 'my-subscription' -p Earliest -n 0
  • 生產訊息
bin/pulsar-client produce persistent://my-test/my-namespace/wordcount_src --messages "itxs" -n 10

視窗函數範例

  • 建立vim examples/example-stateful-function-config.yaml
tenant: "my-test"
namespace: "my-namespace"
name: "window-example"
className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
inputs: ["persistent://my-test/my-namespace/window_src"]
userConfig:
  "PublishTopic": "persistent://my-test/my-namespace/window_result"

output: "persistent://my-test/my-namespace/window_result"
autoAck: true
parallelism: 1
windowConfig:
  windowLengthCount: 10
  slidingIntervalCount: 5
  • 建立函數
bin/pulsar-admin functions create \
   --function-config-file examples/example-window-function-config.yaml \
   --jar examples/api-examples.jar
  • 消費訊息
bin/pulsar-client consume -s test-sub -n 0 persistent://my-test/my-namespace/window_result
  • 生產訊息
bin/pulsar-client produce -m "3" -n 10 persistent://my-test/my-namespace/window_src
  • 檢視消費視窗輸出

自定義函數開發

定義

Pulsar 函數支援Java、Python和Go等語言,如果是Java語言則支援下面三類介面:

  • 使用原生語言介面:不需要特定於Pulsar的庫或特殊依賴(只需要JDK核心庫);適合於不需要存取上下文的函數。
  • 使用Pulsar函數SDK:特定於脈衝星的庫,提供了語言本機介面中無法提供的一系列功能,例如狀態管理或使用者設定;適用於需要存取上下文的函數。
  • 擴充套件Pulsar函數SDK:對特定於pulse的庫的擴充套件,在Java中提供初始化和關閉介面。適用於需要初始化和釋放外部資源的函數。

原生語言介面範例

  • 新建Maven工程,Pom檔案內容如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itxs</groupId>
    <artifactId>pulsar-demo</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>sn.itxs.pulsar.function.NativeFunctionDemo</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.10.1</version>
            </plugin>
        </plugins>
    </build>
</project>
  • 建立NativeFunctionDemo.java
package sn.itxs.pulsar.function;

import java.util.function.Function;

public class NativeFunctionDemo implements Function<String, String> {
    @Override
    public String apply(String s) {
        return String.format("hahaha,native implement %s!", s);
    }
}
  • 打包生成pulsar-demo-1.0.jar,上傳到安裝Pulsar伺服器上的,這裡就放在pulsar根目錄下的examples資料夾,後續的操作就和前面函數範例一樣

  • 建立函數描述檔案,vim examples/native-example-function-config.yaml

tenant: "my-test"
namespace: "my-namespace"
name: "native-example"
className: "sn.itxs.pulsar.function.NativeFunctionDemo"
inputs: ["persistent://my-test/my-namespace/native_src"]
userConfig:
  "PublishTopic": "persistent://my-test/my-namespace/native_result"

output: "persistent://my-test/my-namespace/native_result"
autoAck: true
parallelism: 1
  • 建立函數
bin/pulsar-admin functions create \
   --function-config-file examples/native-example-function-config.yaml \
   --jar examples/pulsar-demo-1.0.jar
  • 消費訊息
bin/pulsar-client consume persistent://my-test/my-namespace/native_result -s 'my-subscription' -p Earliest -n 0
  • 生產訊息
bin/pulsar-client produce persistent://my-test/my-namespace/native_src --messages "actual pulsar" -n 10
  • 檢視消費者的輸出

Pulsar函數SDK範例

  • 由於依賴Pulsar函數SDK,因此JDK需要選擇17,在前面的工程新增Pom依賴
    <properties>
        <pulsar.version>2.11.0</pulsar.version>
    </properties>
        
    <dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-functions-api</artifactId>
            <version>${pulsar.version}</version>
        </dependency>
    </dependencies>
  • 打包指定sn.itxs.pulsar.function.SdkFunctionDemo

  • 建立SdkFunctionDemo.java

package sn.itxs.pulsar.function;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class SdkFunctionDemo implements Function<String, String> {
    @Override
    public String process(String input, Context context) {
        return String.format("hahaha,pulsar sdk implement %s!", input);
    }
}
  • 打包生成pulsar-demo-1.0.jar,上傳到安裝Pulsar伺服器上的,這裡還是覆蓋pulsar根目錄下的examples資料夾檔案,其他和前面一樣

  • 建立函數描述檔案,vim examples/sdk-example-function-config.yaml

tenant: "my-test"
namespace: "my-namespace"
name: "sdk-example"
className: "sn.itxs.pulsar.function.SdkFunctionDemo"
inputs: ["persistent://my-test/my-namespace/sdk_src"]
userConfig:
  "PublishTopic": "persistent://my-test/my-namespace/sdk_result"

output: "persistent://my-test/my-namespace/sdk_result"
autoAck: true
parallelism: 1
  • 建立函數
bin/pulsar-admin functions create \
   --function-config-file examples/sdk-example-function-config.yaml \
   --jar examples/pulsar-demo-1.0.jar
  • 消費訊息
bin/pulsar-client consume persistent://my-test/my-namespace/sdk_result -s 'my-subscription' -p Earliest -n 0
  • 生產訊息
bin/pulsar-client produce persistent://my-test/my-namespace/sdk_src --messages "actual pulsar" -n 10
  • 檢視消費者的輸出

  • 本人部落格網站IT小神 www.itxiaoshen.com