@
Function instance(函數範例)是函數執行框架的核心元素,由以下元素組成:
函數範例的內部工作流
Function worker 是一個邏輯元件,用於在Pulsar Functions的叢集模式部署中監視、編排和執行單個函數。每個函數範例都可以作為執行緒或程序執行,具體取決於所選的設定。如果Kubernetes叢集可用,則可以在Kubernetes中以StatefulSets的形式生成函數。
Function worker的內部架構和工作流如下
函數範例是在執行時內呼叫的,許多範例可以並行執行。Pulsar支援三種不同成本和隔離保證的函數執行時型別,以最大限度地提高部署靈活性。可以根據需要使用其中之一來執行函數。
Pulsar提供了三種不同的訊息傳遞語意,可以將它們應用於一個函數。根據ack時間節點確定不同的傳遞語意實現。
提示
可以在建立函數時設定函數的處理保證。如下面的命令建立了一個應用了「精確一次」保證的函數。
bin/pulsar-admin functions create \
--name my-effectively-once-function \
--processing-guarantees EFFECTIVELY_ONCE \
可以使用update命令更改應用於函數的處理保證。
bin/pulsar-admin functions update \
--processing-guarantees ATMOST_ONCE \
目前,視窗函數僅在Java中可用,並且不支援MANUAL和effective -once delivery語意。視窗函數是跨資料視窗(即事件流的有限子集)執行計算的函數。如下圖所示,流被劃分為「桶」,其中可以應用函數。
函數的資料視窗定義包含兩個策略:
觸發策略和驅逐策略都由時間或計數驅動。
提示
捲動視窗將元素分配給具有指定時間長度或計數的視窗。捲動視窗的驅逐策略總是基於視窗已滿。因此只需要指定觸發器策略,基於計數或基於時間。在具有基於計數的觸發策略的捲動視窗中,如以下範例所示,觸發策略被設定為2。當視窗中有兩個專案時,無論時間如何,都會觸發並執行每個函數。
相反,如下面的範例所示,捲動視窗的視窗長度為10秒,這意味著當10秒的時間間隔過去時,函數將被觸發,而不管視窗中有多少事件。
滑動視窗方法通過設定清除策略來限制保留用於處理的資料量,並使用滑動間隔設定觸發器策略來定義固定的視窗長度。如果滑動間隔小於視窗長度,則存在資料重疊,這意味著同時落入相鄰視窗的資料將被多次用於計算。如下面的範例所示,視窗長度為2秒,這意味著任何超過2秒的資料都將被清除,不會在計算中使用。滑動間隔被設定為1秒,這意味著該函數每秒執行一次,以處理整個視窗長度內的資料。
在獨立的Pulsar中建立和驗證函數(包括有狀態函數和視窗函數)的分步說明和範例
functionsWorkerEnabled=true
如果是standalone Pulsar 在conf/standalone.conf檔案中增加上面的欄位。
bin/pulsar-daemon stop broker
bin/pulsar-daemon start broker
bin/pulsar-admin functions-worker get-cluster
使用官方的函數範例演示,檢視根目錄下examples資料夾
bin/pulsar-admin tenants create my-test
bin/pulsar-admin namespaces create my-test/my-namespace
bin/pulsar-admin namespaces list my-test
tenant: "my-test"
namespace: "my-namespace"
name: "example"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs: ["persistent://my-test/my-namespace/test_src"]
userConfig:
"PublishTopic": "persistent://my-test/my-namespace/test_result"
output: "persistent://my-test/my-namespace/test_result"
autoAck: true
parallelism: 1
bin/pulsar-admin functions create \
--function-config-file examples/example-function-config.yaml \
--jar examples/api-examples.jar
bin/pulsar-admin functions get \
--tenant my-test \
--namespace my-namespace \
--name example
bin/pulsar-admin functions status \
--tenant my-test \
--namespace my-namespace \
--name example
bin/pulsar-client consume persistent://my-test/my-namespace/test_result -s 'my-subscription' -p Earliest -n 0
bin/pulsar-client produce persistent://my-test/my-namespace/test_src --messages "test-messages-`date`" -n 10
### Grpc Server ###
#
## the grpc server port to listen on. default is 4181
storageserver.grpc.port=4181
#
#### Dlog Settings for table service ###
#
##### Replication Settings
dlog.bkcEnsembleSize=3
dlog.bkcWriteQuorumSize=2
dlog.bkcAckQuorumSize=2
#
#### Storage ###
#
## local storage directories for storing table ranges data (e.g. rocksdb sst files)
storage.range.store.dirs=data/bookkeeper/ranges
#
## whether the storage server capable of serving readonly tables. default is false.
storage.serve.readonly.tables=false
#
## the cluster controller schedule interval, in milliseconds. default is 30 seconds.
storage.cluster.controller.schedule.interval.ms=30000
tenant: "my-test"
namespace: "my-namespace"
name: "word_count"
className: "org.apache.pulsar.functions.api.examples.WordCountFunction"
inputs: ["persistent://my-test/my-namespace/wordcount_src"] # this function will read messages from these topics
autoAck: true
parallelism: 1
bin/pulsar-admin functions create \
--function-config-file examples/example-stateful-function-config.yaml \
--jar examples/api-examples.jar
bin/pulsar-admin functions querystate \
--tenant my-test \
--namespace my-namespace \
--name word_count -k itxs -w
bin/pulsar-client consume persistent://my-test/my-namespace/wordcount_result -s 'my-subscription' -p Earliest -n 0
bin/pulsar-client consume test_wordcount_dest -s 'my-subscription' -p Earliest -n 0
bin/pulsar-client produce persistent://my-test/my-namespace/wordcount_src --messages "itxs" -n 10
tenant: "my-test"
namespace: "my-namespace"
name: "window-example"
className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
inputs: ["persistent://my-test/my-namespace/window_src"]
userConfig:
"PublishTopic": "persistent://my-test/my-namespace/window_result"
output: "persistent://my-test/my-namespace/window_result"
autoAck: true
parallelism: 1
windowConfig:
windowLengthCount: 10
slidingIntervalCount: 5
bin/pulsar-admin functions create \
--function-config-file examples/example-window-function-config.yaml \
--jar examples/api-examples.jar
bin/pulsar-client consume -s test-sub -n 0 persistent://my-test/my-namespace/window_result
bin/pulsar-client produce -m "3" -n 10 persistent://my-test/my-namespace/window_src
Pulsar 函數支援Java、Python和Go等語言,如果是Java語言則支援下面三類介面:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itxs</groupId>
<artifactId>pulsar-demo</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>sn.itxs.pulsar.function.NativeFunctionDemo</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
</plugin>
</plugins>
</build>
</project>
package sn.itxs.pulsar.function;
import java.util.function.Function;
public class NativeFunctionDemo implements Function<String, String> {
@Override
public String apply(String s) {
return String.format("hahaha,native implement %s!", s);
}
}
打包生成pulsar-demo-1.0.jar,上傳到安裝Pulsar伺服器上的,這裡就放在pulsar根目錄下的examples資料夾,後續的操作就和前面函數範例一樣
建立函數描述檔案,vim examples/native-example-function-config.yaml
tenant: "my-test"
namespace: "my-namespace"
name: "native-example"
className: "sn.itxs.pulsar.function.NativeFunctionDemo"
inputs: ["persistent://my-test/my-namespace/native_src"]
userConfig:
"PublishTopic": "persistent://my-test/my-namespace/native_result"
output: "persistent://my-test/my-namespace/native_result"
autoAck: true
parallelism: 1
bin/pulsar-admin functions create \
--function-config-file examples/native-example-function-config.yaml \
--jar examples/pulsar-demo-1.0.jar
bin/pulsar-client consume persistent://my-test/my-namespace/native_result -s 'my-subscription' -p Earliest -n 0
bin/pulsar-client produce persistent://my-test/my-namespace/native_src --messages "actual pulsar" -n 10
<properties>
<pulsar.version>2.11.0</pulsar.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>${pulsar.version}</version>
</dependency>
</dependencies>
打包指定
建立SdkFunctionDemo.java
package sn.itxs.pulsar.function;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public class SdkFunctionDemo implements Function<String, String> {
@Override
public String process(String input, Context context) {
return String.format("hahaha,pulsar sdk implement %s!", input);
}
}
打包生成pulsar-demo-1.0.jar,上傳到安裝Pulsar伺服器上的,這裡還是覆蓋pulsar根目錄下的examples資料夾檔案,其他和前面一樣
建立函數描述檔案,vim examples/sdk-example-function-config.yaml
tenant: "my-test"
namespace: "my-namespace"
name: "sdk-example"
className: "sn.itxs.pulsar.function.SdkFunctionDemo"
inputs: ["persistent://my-test/my-namespace/sdk_src"]
userConfig:
"PublishTopic": "persistent://my-test/my-namespace/sdk_result"
output: "persistent://my-test/my-namespace/sdk_result"
autoAck: true
parallelism: 1
bin/pulsar-admin functions create \
--function-config-file examples/sdk-example-function-config.yaml \
--jar examples/pulsar-demo-1.0.jar
bin/pulsar-client consume persistent://my-test/my-namespace/sdk_result -s 'my-subscription' -p Earliest -n 0
bin/pulsar-client produce persistent://my-test/my-namespace/sdk_src --messages "actual pulsar" -n 10