雲原生時代頂流訊息中介軟體Apache Pulsar部署實操之Pulsar IO與Pulsar SQL

2023-03-08 21:01:30

@

Pulsar IO (Connector聯結器)

基礎定義

Pulsar IO聯結器能夠輕鬆地建立、部署和管理與外部系統(如Apache Cassandra、Aerospike等)互動的聯結器。IO聯結器有兩種型別:源聯結器和接收器聯結器。

可以通過Connector Admin CLI使用源和接收器子命令管理Pulsar聯結器(例如,在聯結器上建立、更新、啟動、停止、重新啟動、重新載入、刪除和執行其他操作)。有關最新和完整的資訊,請參閱Pulsar管理檔案。

安裝Pulsar和內建聯結器

在將Pulsar連線到資料庫之前,需要先安裝Pulsar和所需的內建聯結器。要啟用Pulsar聯結器,您需要在下載頁面上下載聯結器的tarball版本。

# 下載最新版本2.11.0的pulsar-io-cassandra和pulsar-io-jdbc-postgres,需要什麼聯結器可以從官方檢視是否支援並下載,這裡舉例就下載兩個
https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.11.0/connectors/pulsar-io-cassandra-2.11.0.nar
https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-2.11.0/connectors/pulsar-io-jdbc-postgres-2.11.0.nar
# 在pulsar根目錄下建立目錄
mkdir connectors
# 將壓縮檔案移動connectors目錄
mv pulsar-io-jdbc-postgres-2.11.0.nar pulsar-io-jdbc-postgres-2.11.0.nar connectors
# 重啟pulsar
# 檢視可用聯結器列表
curl -w '\n' -s http://localhost:8080/admin/v2/functions/connectors

連線Pulsar到Cassandra

安裝cassandra叢集

# 下載映象並啟動cassandra測試容器
docker run -d --rm --name=cassandra -p 9042:9042 cassandra
# 檢視程序
docker ps
# 檢視執行紀錄檔
docker logs cassandra
# 等待一小段時間後檢視Cassandra叢集狀態
docker exec cassandra nodetool status
# 使用cqlsh連線到Cassandra叢集

# 使用cqlsh連線到Cassandra叢集
docker exec -ti cassandra cqlsh localhost
# 建立一個金鑰空間pulsar_itxs_keyspace
CREATE KEYSPACE pulsar_itxs_keyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};
# 建立一個表pulsar_itxs_table
USE pulsar_itxs_keyspace;
CREATE TABLE pulsar_itxs_table (key text PRIMARY KEY, col text);

設定Cassandra接收器

現在已經有一個Cassandra叢集在本地執行;要執行Cassandra接收器聯結器,需要準備一個組態檔,其中包括Pulsar聯結器執行時需要知道的資訊,例如Pulsar聯結器如何找到Cassandra叢集,Pulsar聯結器用於寫入Pulsar訊息的鍵空間和表是什麼等等;可以使用Json或者Yaml這兩種格式建立組態檔。

vim examples/cassandra-sink.json

{
    "roots": "192.168.3.100:9042",
    "keyspace": "pulsar_itxs_keyspace",
    "columnFamily": "pulsar_itxs_table",
    "keyname": "key",
    "columnName": "col"
}

vim examples/cassandra-sink.yml

configs:
    roots: "192.168.3.100:9042"
    keyspace: "pulsar_itxs_keyspace"
    columnFamily: "pulsar_itxs_table"
    keyname: "key"
    columnName: "col"

建立Cassandra Sink

可以使用Connector Admin CLI建立sink聯結器和操作。執行下面命令來建立一個Cassandra接收器聯結器,接收器型別為Cassandra,組態檔為上一步建立的examples/cassandra-sink.yml。

bin/pulsar-admin sinks create \
    --tenant my-test \
    --namespace my-namespace \
    --name cassandra-itxs-sink \
    --sink-type cassandra \
    --sink-config-file examples/cassandra-sink.yml \
    --inputs persistent://my-test/my-namespace/itxs_cassandra    

命令執行後,Pulsar建立接收器聯結器cassandra-itxs-sink。這個接收器聯結器作為Pulsar函數執行,並將主題itxs_cassandra中產生的訊息寫入Cassandra表pulsar_itxs_table;

可以使用Connector Admin CLI對聯結器進行監控和其他操作。

  • 獲取聯結器的資訊
bin/pulsar-admin sinks get \
  --tenant my-test \
  --namespace my-namespace \
  --name cassandra-itxs-sink
  • 檢查聯結器的狀態
bin/pulsar-admin sinks status \
  --tenant my-test \
  --namespace my-namespace \
  --name cassandra-itxs-sink

驗證Cassandra Sink結果

生成一些訊息到Cassandra接收器itxs_cassandra的輸入主題

for i in {0..9}; do bin/pulsar-client produce -m "itxskey-$i" -n 1 persistent://my-test/my-namespace/itxs_cassandra; done

再次檢視聯結器的狀態,可以有10條記錄處理統計資訊

檢視Cassandra的pulsar_itxs_table

USE pulsar_itxs_keyspace;
select * from pulsar_itxs_table;

刪除Cassandra Sink

bin/pulsar-admin sinks delete \
    --tenant my-test \
    --namespace my-namespace \
    --name cassandra-itxs-sink

連線Pulsar到PostgreSQL

安裝PostgreSQL叢集

這裡使用PostgreSQL 12 docker映象在docker中啟動一個單節點PostgreSQL叢集。

# 從Docker中拉取PostgreSQL 12映像
docker pull postgres:12
# 啟動postgres容器
docker run -d -it --rm \
    --name pulsar-postgres \
    -p 5432:5432 \
    -e POSTGRES_PASSWORD=password \
    -e POSTGRES_USER=postgres \
    postgres:12
# 檢視執行紀錄檔
docker logs -f pulsar-postgres
# 進入容器
docker exec -it pulsar-postgres /bin/bash
# 使用預設使用者名稱和密碼登入PostgreSQL
psql -U postgres postgres
# 使用以下命令建立pulsar_postgres_jdbc_sink表:
create table if not exists pulsar_postgres_jdbc_sink
(
id serial PRIMARY KEY,
name VARCHAR(255) NOT NULL
);

設定JDBC接收器

現在有一個本地執行的PostgreSQ,接下來需要設定JDBC接收器聯結器。

  • 建立組態檔vim connectors/pulsar-postgres-jdbc-sink.yaml
configs:
  userName: "postgres"
  password: "password"
  jdbcUrl: "jdbc:postgresql://192.169.3.100:5432/postgres"
  tableName: "pulsar_postgres_jdbc_sink"

建立JDBC Sink

執行下面命令後,Pulsar將建立接收器聯結器pulse -postgres-jdbc-sink。這個sink聯結器作為Pulsar函數執行,並將Topic為pulsar-postgres-jdbc-sink-topic中產生的訊息寫入PostgreSQL表pulsar_postgres_jdbc_sink。

bin/pulsar-admin sinks create \
    --tenant my-test \
    --namespace my-namespace \
    --archive ./connectors/pulsar-io-jdbc-postgres-2.11.0.nar \
    --inputs persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic \
    --name pulsar-postgres-my-jdbc-sink \
    --sink-config-file ./connectors/pulsar-postgres-jdbc-sink.yaml \
    --parallelism 1

列出所有的sink

bin/pulsar-admin sinks list \
    --tenant my-test \
    --namespace my-namespace

驗證JDBC Sink結果

通過JavaAPI生成一些訊息到Cassandra接收器pulsar-postgres-jdbc-sink-topic這個主題,在Java專案新增maven依賴

    <properties>
        <pulsar.version>2.11.0</pulsar.version>
    </properties>        
        

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>${pulsar.version}</version>
        </dependency>

這裡演示實體類成員變數簡單就直接使用public宣告了

package sn.itxs.pulsar.io;

public class User{
    public int id;
    public String name;
}

新增ClientDemo.java

package sn.itxs.pulsar.io;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.AvroSchema;

public class ClientDemo {
    public static void main(String[] args) throws Exception {
        PulsarClient client = null;
        Producer<User> producer = null;
        try {
            client = PulsarClient.builder()
                    .serviceUrl("pulsar://192.168.5.52:6650")
                    .build();

            producer = client.newProducer(AvroSchema.of(User.class))
                    .topic("persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic")
                    .create();
            User user = new User();
            int index = 10;
            while (index++ < 20) {
                try {
                    user.id = index;
                    user.name = "this is a test " + index;
                    producer.newMessage().value(user).send();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            System.out.println("send finish");
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (producer!=null){
                producer.close();
            }
            if (client!=null){
                client.close();
            }
        }
    }
}

執行程式後檢視PostgreSQL表pulsar_postgres_jdbc_sink,已經有剛才

上面由於在Java中建立了Schema,因此不需要手工建立,可以檢視當前persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic主體已生成Schema資訊如下:

如果要從pulsar-admin命令列建立schema可以這樣操作

  • 建立schema,建立一個avro-schema檔案,將以下內容複製到該檔案中,並將該檔案放在pulsar/connectors資料夾中。vim connectors/avro-schema
{
  "type": "AVRO",
  "schema": "{\"type\":\"record\",\"name\":\"Test\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}",
  "properties": {}
}
  • 上傳schema到topic,將avro-schema模式上傳到pulsar-postgres-jdbc-sink-topic主題
bin/pulsar-admin schemas upload persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic -f ./connectors/avro-schema
  • 檢查模式是否上傳成功。
bin/pulsar-admin schemas get persistent://my-test/my-namespace/pulsar-postgres-jdbc-sink-topic1

如需stop停止、restart重啟指定的sinks可以如下操作,當然也可以更新指定sinks,詳細命令可以查閱官網

bin/pulsar-admin sinks stop \
    --tenant my-test \
    --namespace my-namespace \
    --name pulsar-postgres-my-jdbc-sink \

Pulsar SQL

定義

Apache Pulsar用於儲存事件資料流,事件資料由預定義的欄位構成。通過模式登入檔的實現,可以在Pulsar中儲存結構化資料,並使用Trino(以前是Presto SQL)查詢資料。作為Pulsar SQL的核心,Pulsar Trino外掛使Trino叢集中的Trino worker能夠查詢來自Pulsar的資料.

由於Pulsar採用了基於兩級段的架構,因此查詢效能高效且可延伸性強。Pulsar中的主題在Apache BookKeeper中儲存為段。每個主題段被複制到一些BookKeeper節點上,從而支援並行讀和高讀吞吐量。在Pulsar Trino聯結器中,資料直接從BookKeeper中讀取,因此Trino worker可以同時從水平可延伸數量的BookKeeper節點中讀取

簡單使用

在Pulsar中查詢資料前,需要安裝Pulsar和內建聯結器。

# 這裡演示就直接啟動獨立叢集
PULSAR_STANDALONE_USE_ZOOKEEPER=1 ./bin/pulsar standalone
# 啟動一個Pulsar SQL worker
./bin/pulsar sql-worker run
# 初始化Pulsar獨立叢集和SQL worker後,執行SQL CLI:
./bin/pulsar sql
show catalogs;
show schemas in pulsar;
show tables in pulsar."public/default";

通過前面的Java範例,我們改為Json格式寫入Pulsar的user-topic

package sn.itxs.pulsar.io;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;

public class ClientSqlDemo {
    public static void main(String[] args) throws Exception {
        PulsarClient client = null;
        Producer<User> producer = null;
        try {
            client = PulsarClient.builder()
                    .serviceUrl("pulsar://192.168.5.52:6650")
                    .build();

            producer = client.newProducer(Schema.JSON(User.class))
                    .topic("user-topic")
                    .create();
            User user = new User();
            int index = 10;
            while (index++ < 20) {
                try {
                    user.id = index;
                    user.name = "this is a test " + index;
                    producer.newMessage().value(user).send();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            System.out.println("send finish");
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (producer!=null){
                producer.close();
            }
            if (client!=null){
                client.close();
            }
        }
    }
}

執行程式後再來查詢就有剛才傳送的訊息資料,_開頭的欄位為Pulsar 自帶的。

select * from pulsar."public/default"."user-topic";

  • 本人部落格網站IT小神 www.itxiaoshen.com