@
Chunjun 官網 https://dtstack.github.io/chunjun-web/ 原始碼release最新版本1.12.8
Chunjun 檔案地址 https://ververica.github.io/flink-cdc-connectors/master/
Chunjun 原始碼地址 https://github.com/DTStack/chunjun
Chunjun是一個分散式整合框架,原名是FlinkX,由袋鼠雲開源,其基於Flink的批流統一打造的資料同步工具,可以實現各種異構資料來源之間的資料同步和計算。
ChunJun是一個基於 Flink 提供易用、穩定、高效的批流統一的資料整合工具,可以採集靜態的資料如 MySQL,HDFS 等,也可以採集實時變化的資料如 binlog,Kafka等。
# 最新release版本原始碼flink12.7,如果是下載主線master版本,目前原始碼預設引入flink16.1,可以通過git clone https://github.com/DTStack/chunjun.git也可以直接http下main,由於是學習可使用master版本來踩坑
wget https://github.com/DTStack/chunjun/archive/refs/tags/v1.12.8.tar.gz
tar -xvf v1.12.8.tar.gz
# 進入原始碼目錄
cd chunjun-1.12.8/
# 編譯打包執行,下面兩種選一
./mvnw clean package
sh build/build.sh
在根目錄下生成 chunjun-dist目錄,官方提供豐富的範例程式,詳細可以檢視chunjun-examples目錄
下表顯示了ChunJun分支與flink版本的對應關係。如果版本沒有對齊,在任務中會出現'Serialization Exceptions', 'NoSuchMethod Exception'等問題。
一個完整的 ChunJun 任務指令碼設定包含 content, setting 兩個部分。content 用於設定任務的輸入源與輸出源,其中包含 reader,writer。而 setting 則設定任務整體的環境設定,其中包含 speed,errorLimit,metricPluginConf,restore,log,dirty。總體結構如下所示:
{
"job" : {
"content" :[{
"reader" : {},
"writer" : {}
}],
"setting" : {
"speed" : {},
"errorLimit" : {},
"metricPluginConf" : {},
"restore" : {},
"log" : {},
"dirty":{}
}
}
}
reader 用於設定資料的輸入源,即資料從何而來。具體設定如下所示:
"reader" : {
"name" : "xxreader",
"parameter" : {
......
}
}
Writer 用於設定資料的輸出目的地,即資料寫到哪裡去。具體設定如下所示:
"writer" : {
"name" : "xxwriter",
"parameter" : {
......
}
}
詳細使用檢視官方的說明
進入Chunjun根目錄,測試指令碼執行本地環境,檢視stream.json
{
"job": {
"content": [
{
"reader": {
"parameter": {
"column": [
{
"name": "id",
"type": "id"
},
{
"name": "name",
"type": "string"
},
{
"name": "content",
"type": "string"
}
],
"sliceRecordCount": [
"30"
],
"permitsPerSecond": 1
},
"table": {
"tableName": "sourceTable"
},
"name": "streamreader"
},
"writer": {
"parameter": {
"column": [
{
"name": "id",
"type": "id"
},
{
"name": "name",
"type": "string"
}
],
"print": true
},
"table": {
"tableName": "sinkTable"
},
"name": "streamwriter"
},
"transformer": {
"transformSql": "select id,name from sourceTable where CHAR_LENGTH(name) < 50 and CHAR_LENGTH(content) < 50"
}
}
],
"setting": {
"errorLimit": {
"record": 100
},
"speed": {
"bytes": 0,
"channel": 1,
"readerChannel": 1,
"writerChannel": 1
}
}
}
}
bash ./bin/chunjun-local.sh -job chunjun-examples/json/stream/stream.json
將依賴檔案複製到Flink lib目錄下,這個複製操作需要在所有Flink cluster機器上執行
cp -r chunjun-dist $FLINK_HOME/lib
啟動Flink Standalone環境
sh $FLINK_HOME/bin/start-cluster.sh
準備mysql的資料,作為讀取資料來源
準備job檔案,建立chunjun-examples/json/mysql/mysql_hdfs_polling_my.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column" : [
{
"name" : "id",
"type" : "bigint"
},{
"name" : "name",
"type" : "varchar"
},{
"name" : "age",
"type" : "bigint"
}
],
"splitPk": "id",
"splitStrategy": "mod",
"increColumn": "id",
"startLocation": "1",
"username": "root",
"password": "123456",
"queryTimeOut": 2000,
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://mysqlserver:3308/my_maxwell_01?useSSL=false"
],
"table": [
"account"
]
}
],
"polling": false,
"pollingInterval": 3000
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"fileType": "text",
"path": "hdfs://myns/user/hive/warehouse/chunjun.db/kudu_txt",
"defaultFS": "hdfs://myns",
"fileName": "pt=1",
"fieldDelimiter": ",",
"encoding": "utf-8",
"writeMode": "overwrite",
"column": [
{
"name": "id",
"type": "BIGINT"
},
{
"name": "VARCHAR",
"type": "VARCHAR"
},
{
"name": "age",
"type": "BIGINT"
}
],
"hadoopConfig": {
"hadoop.user.name": "root",
"dfs.ha.namenodes.ns": "nn1,nn2",
"fs.defaultFS": "hdfs://myns",
"dfs.namenode.rpc-address.ns.nn2": "hadoop1:9000",
"dfs.client.failover.proxy.provider.ns": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"dfs.namenode.rpc-address.ns.nn1": "hadoop2:9000",
"dfs.nameservices": "myns",
"fs.hdfs.impl.disable.cache": "true",
"fs.hdfs.impl": "org.apache.hadoop.hdfs.DistributedFileSystem"
}
}
}
}
],
"setting" : {
"restore" : {
"restoreColumnName" : "id",
"restoreColumnIndex" : 0
},
"speed" : {
"bytes" : 0,
"readerChannel" : 3,
"writerChannel" : 3
}
}
}
}
啟動同步任務
bash ./bin/chunjun-standalone.sh -job chunjun-examples/json/mysql/mysql_hdfs_polling_my.json
任務執行完後通過web控制檯可以看到執行成功資訊,檢視HDFS路徑資料也可以看到剛剛成功寫入的資料
建立一個個Kafka的topic用於資料來源讀取
kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 3 --partitions 3 --topic my_test1
ClickHouse建立testdb資料庫和sql_side_table表
CREATE DATABASE IF NOT EXISTS testdb;
CREATE TABLE if not exists sql_side_table
(
id Int64,
test1 Int64,
test2 Int64
) ENGINE = MergeTree()
PRIMARY KEY (id);
insert into sql_side_table values(1,11,101),(2,12,102),(3,13,103);
MySQL建立sql_sink_table表
CREATE TABLE `sql_sink_table` (
`id` bigint NOT NULL,
`name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL,
`test1` bigint DEFAULT NULL,
`test2` bigint DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci
建立sql檔案chunjun-examples/sql/clickhouse/kafka_clickhouse_my.sql
CREATE TABLE source (
id BIGINT,
name STRING
) WITH (
'connector' = 'kafka-x',
'topic' = 'my_test1',
'properties.bootstrap.servers' = 'kafka1:9092',
'properties.group.id' = 'dodge',
'format' = 'json'
);
CREATE TABLE side (
id BIGINT,
test1 BIGINT,
test2 BIGINT
) WITH (
'connector' = 'clickhouse-x',
'url' = 'jdbc:clickhouse://ck1:8123/testdb',
'table-name' = 'sql_side_table',
'username' = 'default',
'lookup.cache-type' = 'lru'
);
CREATE TABLE sink (
id BIGINT,
name VARCHAR,
test1 BIGINT,
test2 BIGINT
)WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://mysqlserver:3306/test',
'table-name' = 'sql_sink_table',
'username' = 'root',
'password' = '123456',
'sink.buffer-flush.max-rows' = '1024',
'sink.buffer-flush.interval' = '10000',
'sink.all-replace' = 'true'
);
INSERT INTO sink
SELECT
s1.id AS id,
s1.name AS name,
s2.test1 AS test1,
s2.test2 AS test2
FROM source s1
JOIN side s2
ON s1.id = s2.id
啟動同步任務
bash ./bin/chunjun-standalone.sh -job chunjun-examples/sql/clickhouse/kafka_clickhouse_my.sql
往kafka的my_test1這個topic寫入資料
./kafka-console-producer.sh --broker-list cdh1:9092 --topic my_test1
{"id":1,"name":"sunhaiyang"}
{"id":2,"name":"gulili"}
檢視MySQL的sql_sink_table表已經有剛才寫入訊息並關聯出結果的資料
建立兩個Kafka的topic,一個用於資料來源讀取,一個用於資料來源寫入
kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 3 --partitions 3 --topic my_test3
kafka-topics.sh --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --replication-factor 3 --partitions 3 --topic my_test4
建立sql檔案chunjun-examples/sql/kafka/kafka_kafka_my.sql
CREATE TABLE source_test (
id INT
, name STRING
, money decimal
, datethree timestamp
, `partition` BIGINT METADATA VIRTUAL -- from Kafka connector
, `topic` STRING METADATA VIRTUAL -- from Kafka connector
, `leader-epoch` int METADATA VIRTUAL -- from Kafka connector
, `offset` BIGINT METADATA VIRTUAL -- from Kafka connector
, ts TIMESTAMP(3) METADATA FROM 'timestamp' -- from Kafka connector
, `timestamp-type` STRING METADATA VIRTUAL -- from Kafka connector
, partition_id BIGINT METADATA FROM 'partition' VIRTUAL -- from Kafka connector
, WATERMARK FOR datethree AS datethree - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka-x'
,'topic' = 'my_test3'
,'properties.bootstrap.servers' = 'kafka1:9092'
,'properties.group.id' = 'test1'
,'scan.startup.mode' = 'earliest-offset'
,'format' = 'json'
,'json.timestamp-format.standard' = 'SQL'
,'scan.parallelism' = '2'
);
CREATE TABLE sink_test
(
id INT
, name STRING
, money decimal
, datethree timestamp
, `partition` BIGINT
, `topic` STRING
, `leader-epoch` int
, `offset` BIGINT
, ts TIMESTAMP(3)
, `timestamp-type` STRING
, partition_id BIGINT
) WITH (
'connector' = 'kafka-x'
,'topic' = 'my_test4'
,'properties.bootstrap.servers' = 'kafka1:9092'
,'format' = 'json'
,'sink.parallelism' = '2'
,'json.timestamp-format.standard' = 'SQL'
);
INSERT INTO sink_test
SELECT *
from source_test;
往kafka的my_test3這個topic寫入資料
kafka-console-producer.sh --broker-list cdh1:9092 --topic my_test3
{"id":100,"name":"guocai","money":243.18,"datethree":"2023-07-03 22:00:00.000"}
{"id":101,"name":"hanmeimei","money":137.32,"datethree":"2023-07-03 22:00:01.000"}
啟動同步任務
bash ./bin/chunjun-standalone.sh -job chunjun-examples/sql/kafka/kafka_kafka_my.sql
檢視kafka的my_test4的資料,已經收到相應資料並打上kafka後設資料資訊
kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_test4 --from-beginning