國產開源流批統一的資料同步工具Chunjun入門實戰

2023-07-05 06:00:49

@

概述

定義

Chunjun 官網 https://dtstack.github.io/chunjun-web/ 原始碼release最新版本1.12.8

Chunjun 檔案地址 https://ververica.github.io/flink-cdc-connectors/master/

Chunjun 原始碼地址 https://github.com/DTStack/chunjun

Chunjun是一個分散式整合框架,原名是FlinkX,由袋鼠雲開源,其基於Flink的批流統一打造的資料同步工具,可以實現各種異構資料來源之間的資料同步和計算。

ChunJun是一個基於 Flink 提供易用、穩定、高效的批流統一的資料整合工具,可以採集靜態的資料如 MySQL,HDFS 等,也可以採集實時變化的資料如 binlog,Kafka等。

特性

  • 易使用:基於JSON模板和SQL指令碼 快速構建資料同步任務,SQL指令碼相容Flink SQL語法;只需要關注資料來源的結構資訊即可, 節省了時間,專注於資料整合的開發。FlinkX既支援資料同步、實時採集,也支援SQL流與維表的Join,實現了一套外掛完成資料的同步、轉換與計算。
  • 基於 Flink:基於flink 原生的input,output 相關介面來實現多種資料來源之間的資料傳輸,同時可以基於 flink 擴充套件外掛。易於擴充套件,高度靈活,新擴充套件的資料來源外掛可以瞬間與現有的資料來源外掛整合,外掛開發人員無需關心其他外掛的程式碼邏輯;
  • 多種執行模式:支援分散式運算元 支援 flink-standalone、yarn-session、 yarn-per job 及其他提交任務方式。支援Docker一鍵式部署,支援在k8s上部署和執行,支援使用native kuberentes方式以session和run-application模式提交任務。
  • 關鍵特性
    • 多種資料來源之間資料傳輸 ,支援MySQL、Oracle、SQLServer、Hive、Kudu等20多種資料來源的同步計算
    • 斷點續傳 :配合flink檢查點機制,實現斷點恢復、任務容災。比如針對斷點續傳主要是設定斷點續傳欄位和斷點續傳欄位在reader裡的column的位置,當然前提任務也是得開啟checkpoint。
      • 部分外掛支援通過Flink的checkpoint機制從失敗的位置恢復任務。斷點續傳對資料來源 ️強制要求:
        • 必須包含一個升序的欄位,比如主鍵或者日期型別的欄位,同步過程中會使用checkpoint機制記錄這個欄位的值,任務恢復執行時使用這個欄位構造查詢條件過濾已經同步過的資料,如果這個欄位的值不是升序的,那麼任務恢復時過濾的資料就是錯誤的,最終導致資料的缺失或重複。
        • 資料來源必須支援資料過濾,如果不支援的話,任務就無法從斷點處恢復執行,會導致資料重複。
        • 目標資料來源必須支援事務,比如關聯式資料庫,檔案型別的資料來源也可以通過臨時檔案的方式支援。
    • 全量與增量同步:不僅支援同步DML資料,還支援DDL同步,如'CREATE TABLE', 'ALTER COLUMN'等;比如利用增量鍵,資料庫表中增量遞增的欄位,比如自增id及其開始位置。
    • 實時採集:既支援離線同步計算,又相容實時場景;實時資料還原。
    • FlinkX支援二階段提交,目前FlinkX幾乎所有外掛都支援二階段提交。
    • FlinkX支援資料湖 Iceberg,可以流式讀取和寫入Iceberg資料湖,未來也會加入Hudi支援。
    • 流控管理:巨量資料同步時在負載高的時候有時候會給系統帶來很大的壓力,FlinkX使用令牌桶限流方式限速,當源端產生資料的速率達到一定閾值就不會讀取資料。
    • 大多數外掛支援資料的並行讀寫,可以大大提高讀寫速度;
    • 髒資料管理:異構系統執行巨量資料遷移不可避免的會有髒資料產生,髒資料會影響同步任務的執行,FlinkX的Writer外掛在寫資料是會把以下幾種型別作為髒資料寫入髒資料表裡:
      • 型別轉換錯誤
      • 空指標
      • 主鍵衝突
      • 其它錯誤

部署

安裝

  • 部署Flink叢集(使用前面)
  • 獲取原始碼編譯打包
# 最新release版本原始碼flink12.7,如果是下載主線master版本,目前原始碼預設引入flink16.1,可以通過git clone https://github.com/DTStack/chunjun.git也可以直接http下main,由於是學習可使用master版本來踩坑
wget https://github.com/DTStack/chunjun/archive/refs/tags/v1.12.8.tar.gz
tar -xvf v1.12.8.tar.gz
# 進入原始碼目錄
cd chunjun-1.12.8/
# 編譯打包執行,下面兩種選一
./mvnw clean package
sh build/build.sh

在根目錄下生成 chunjun-dist目錄,官方提供豐富的範例程式,詳細可以檢視chunjun-examples目錄

版本對應關係

下表顯示了ChunJun分支與flink版本的對應關係。如果版本沒有對齊,在任務中會出現'Serialization Exceptions', 'NoSuchMethod Exception'等問題。

通用設定詳解

整體設定

一個完整的 ChunJun 任務指令碼設定包含 content, setting 兩個部分。content 用於設定任務的輸入源與輸出源,其中包含 reader,writer。而 setting 則設定任務整體的環境設定,其中包含 speed,errorLimit,metricPluginConf,restore,log,dirty。總體結構如下所示:

{
    "job" : {
       "content" :[{
            "reader" : {},
            "writer" : {}
       }],
       "setting" : {
          "speed" : {},
          "errorLimit" : {},
          "metricPluginConf" : {},
          "restore" : {},
          "log" : {},
          "dirty":{}
        }
    }
}

Content 設定

reader 用於設定資料的輸入源,即資料從何而來。具體設定如下所示:

"reader" : {
  "name" : "xxreader",
  "parameter" : {
        ......
  }
}

Writer 用於設定資料的輸出目的地,即資料寫到哪裡去。具體設定如下所示:

"writer" : {
  "name" : "xxwriter",
  "parameter" : {
        ......
  }
}

Setting 設定

  • speed 用於設定任務並行數及速率限制。具體設定如下所示
  • errorLimit 用於設定任務執行時資料讀取寫入的出錯控制。
  • metricPluginConf 用於設定 flinkx 指標相關資訊。目前只應用於 Jdbc 外掛中,在作業結束時將 StartLocation 和 EndLocation 指標傳送到指定資料來源中。目前支援 Prometheus 和 Mysql。
  • restore 用於設定同步任務型別(離線同步、實時採集)和斷點續傳功能。
  • log 用於設定 ChunJun 中定義的外掛紀錄檔的儲存與記錄。
  • dirty 用於設定髒資料的儲存,通常與 ErrorLimit 聯合使用。

詳細使用檢視官方的說明

Local提交

進入Chunjun根目錄,測試指令碼執行本地環境,檢視stream.json

{
  "job": {
    "content": [
      {
        "reader": {
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "id"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "content",
                "type": "string"
              }
            ],
            "sliceRecordCount": [
              "30"
            ],
            "permitsPerSecond": 1
          },
          "table": {
            "tableName": "sourceTable"
          },
          "name": "streamreader"
        },
        "writer": {
          "parameter": {
            "column": [
              {
                "name": "id",
                "type": "id"
              },
              {
                "name": "name",
                "type": "string"
              }
            ],
            "print": true
          },
          "table": {
            "tableName": "sinkTable"
          },
          "name": "streamwriter"
        },
        "transformer": {
          "transformSql": "select id,name from sourceTable where CHAR_LENGTH(name) < 50 and CHAR_LENGTH(content) < 50"
        }
      }
    ],
    "setting": {
      "errorLimit": {
        "record": 100
      },
      "speed": {
        "bytes": 0,
        "channel": 1,
        "readerChannel": 1,
        "writerChannel": 1
      }
    }
  }
}
bash ./bin/chunjun-local.sh  -job chunjun-examples/json/stream/stream.json

Standalone提交

Json方式使用

將依賴檔案複製到Flink lib目錄下,這個複製操作需要在所有Flink cluster機器上執行

cp -r chunjun-dist $FLINK_HOME/lib

啟動Flink Standalone環境

sh $FLINK_HOME/bin/start-cluster.sh

準備mysql的資料,作為讀取資料來源

準備job檔案,建立chunjun-examples/json/mysql/mysql_hdfs_polling_my.json

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "column" : [
              {
                "name" : "id",
                "type" : "bigint"
              },{
                  "name" : "name",
                  "type" : "varchar"
              },{
                  "name" : "age",
                  "type" : "bigint"
              }
            ],
            "splitPk": "id",
            "splitStrategy": "mod",
            "increColumn": "id",
            "startLocation": "1",
            "username": "root",
            "password": "123456",
            "queryTimeOut": 2000,
            "connection": [
              {
                "jdbcUrl": [
                  "jdbc:mysql://mysqlserver:3308/my_maxwell_01?useSSL=false"
                ],
                "table": [
                  "account"
                ]
              }
            ],
            "polling": false,
            "pollingInterval": 3000
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "fileType": "text",
            "path": "hdfs://myns/user/hive/warehouse/chunjun.db/kudu_txt",
            "defaultFS": "hdfs://myns",
            "fileName": "pt=1",
            "fieldDelimiter": ",",
            "encoding": "utf-8",
            "writeMode": "overwrite",
            "column": [
              {
                "name": "id",
                "type": "BIGINT"
              },
              {
                "name": "VARCHAR",
                "type": "VARCHAR"
              },
              {
                "name": "age",
                "type": "BIGINT"
              }
            ],
            "hadoopConfig": {
              "hadoop.user.name": "root",
              "dfs.ha.namenodes.ns": "nn1,nn2",
              "fs.defaultFS": "hdfs://myns",
              "dfs.namenode.rpc-address.ns.nn2": "hadoop1:9000",
              "dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
              "dfs.namenode.rpc-address.ns.nn1": "hadoop2:9000",
              "dfs.nameservices": "myns",
              "fs.hdfs.impl.disable.cache": "true",
              "fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
            }
          }
        }
      }
    ],
    "setting" : {
      "restore" : {
        "restoreColumnName" : "id",
        "restoreColumnIndex" : 0
      },
      "speed" : {
        "bytes" : 0,
        "readerChannel" : 3,
        "writerChannel" : 3
      }
    }
  }
}

啟動同步任務

bash ./bin/chunjun-standalone.sh  -job chunjun-examples/json/mysql/mysql_hdfs_polling_my.json

任務執行完後通過web控制檯可以看到執行成功資訊,檢視HDFS路徑資料也可以看到剛剛成功寫入的資料

SQL方式使用

MySQL Sink

建立一個個Kafka的topic用於資料來源讀取

kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 3 --partitions 3 --topic my_test1

ClickHouse建立testdb資料庫和sql_side_table表

CREATE DATABASE IF NOT EXISTS testdb;

CREATE TABLE if not exists sql_side_table
(
    id Int64,
    test1 Int64,
    test2 Int64
) ENGINE = MergeTree()
PRIMARY KEY (id);
insert into sql_side_table values(1,11,101),(2,12,102),(3,13,103);

MySQL建立sql_sink_table表

CREATE TABLE `sql_sink_table` (
  `id` bigint NOT NULL,
  `name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
  `test1` bigint DEFAULT NULL,
  `test2` bigint DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci

建立sql檔案chunjun-examples/sql/clickhouse/kafka_clickhouse_my.sql

CREATE TABLE source (
  id   BIGINT,
  name STRING
) WITH (
  'connector' = 'kafka-x',
  'topic' = 'my_test1',
  'properties.bootstrap.servers' = 'kafka1:9092',
  'properties.group.id' = 'dodge',
  'format' = 'json'
);

CREATE TABLE side (
  id BIGINT,
  test1 BIGINT,
  test2 BIGINT
) WITH (
  'connector' = 'clickhouse-x',
  'url' = 'jdbc:clickhouse://ck1:8123/testdb',
  'table-name' = 'sql_side_table',
  'username' = 'default',
  'lookup.cache-type' = 'lru'
);

CREATE TABLE sink (
  id BIGINT,
  name VARCHAR,
  test1 BIGINT,
  test2 BIGINT
)WITH (
      'connector' = 'mysql-x',
      'url' = 'jdbc:mysql://mysqlserver:3306/test',
      'table-name' = 'sql_sink_table',
      'username' = 'root',
      'password' = '123456',
      'sink.buffer-flush.max-rows' = '1024',
      'sink.buffer-flush.interval' = '10000',
      'sink.all-replace' = 'true'
      );

INSERT INTO sink
  SELECT
    s1.id AS id,
    s1.name AS name,
    s2.test1 AS test1,
    s2.test2 AS test2
  FROM source s1
  JOIN side s2
  ON s1.id = s2.id

啟動同步任務

bash ./bin/chunjun-standalone.sh  -job chunjun-examples/sql/clickhouse/kafka_clickhouse_my.sql

往kafka的my_test1這個topic寫入資料

./kafka-console-producer.sh --broker-list cdh1:9092 --topic my_test1
{"id":1,"name":"sunhaiyang"}
{"id":2,"name":"gulili"}

檢視MySQL的sql_sink_table表已經有剛才寫入訊息並關聯出結果的資料

Kafka Sink

建立兩個Kafka的topic,一個用於資料來源讀取,一個用於資料來源寫入

kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 3 --partitions 3 --topic my_test3
kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 3 --partitions 3 --topic my_test4

建立sql檔案chunjun-examples/sql/kafka/kafka_kafka_my.sql

CREATE TABLE source_test (
    id INT
    , name STRING
    , money decimal
    , datethree timestamp
    , `partition` BIGINT METADATA VIRTUAL -- from Kafka connector
    , `topic` STRING METADATA VIRTUAL -- from Kafka connector
    , `leader-epoch` int METADATA VIRTUAL -- from Kafka connector
    , `offset` BIGINT METADATA VIRTUAL  -- from Kafka connector
    , ts TIMESTAMP(3) METADATA FROM 'timestamp' -- from Kafka connector
    , `timestamp-type` STRING METADATA VIRTUAL  -- from Kafka connector
    , partition_id BIGINT METADATA FROM 'partition' VIRTUAL   -- from Kafka connector
    , WATERMARK FOR datethree AS datethree - INTERVAL '5' SECOND
) WITH (
      'connector' = 'kafka-x'
      ,'topic' = 'my_test3'
      ,'properties.bootstrap.servers' = 'kafka1:9092'
      ,'properties.group.id' = 'test1'
      ,'scan.startup.mode' = 'earliest-offset'
      ,'format' = 'json'
      ,'json.timestamp-format.standard' = 'SQL'
      ,'scan.parallelism' = '2'
      );

CREATE TABLE sink_test
(
    id INT
    , name STRING
    , money decimal
    , datethree timestamp
    , `partition` BIGINT
    , `topic` STRING
    , `leader-epoch` int
    , `offset` BIGINT
    , ts TIMESTAMP(3)
    , `timestamp-type` STRING
    , partition_id BIGINT
) WITH (
      'connector' = 'kafka-x'
      ,'topic' = 'my_test4'
      ,'properties.bootstrap.servers' = 'kafka1:9092'
      ,'format' = 'json'
      ,'sink.parallelism' = '2'
      ,'json.timestamp-format.standard' = 'SQL'
      );

INSERT INTO sink_test
SELECT *
from source_test;

往kafka的my_test3這個topic寫入資料

kafka-console-producer.sh --broker-list cdh1:9092 --topic my_test3
{"id":100,"name":"guocai","money":243.18,"datethree":"2023-07-03 22:00:00.000"}
{"id":101,"name":"hanmeimei","money":137.32,"datethree":"2023-07-03 22:00:01.000"}

啟動同步任務

bash ./bin/chunjun-standalone.sh  -job chunjun-examples/sql/kafka/kafka_kafka_my.sql

檢視kafka的my_test4的資料,已經收到相應資料並打上kafka後設資料資訊

kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_test4 --from-beginning

  • 本人部落格網站IT小神 www.itxiaoshen.com