@
flink-cdc-connectors 官網 https://github.com/ververica/flink-cdc-connectors 原始碼release最新版本2.4.0
flink-cdc-connectors 檔案地址 https://ververica.github.io/flink-cdc-connectors/master/
flink-cdc-connectors 原始碼地址 https://github.com/ververica/flink-cdc-connectors
CDC Connectors for Apache Flink 是Apache Flink的一組源聯結器,使用更改資料捕獲(CDC)從不同的資料庫攝取更改,其整合了Debezium作為捕獲資料變化的引擎,因此它可以充分利用Debezium的能力。
Flink CDC是由Flink社群開發的flink-cdc-connectors 的source元件,基於資料庫紀錄檔的 Change Data Caputre 技術,實現了從 MySQL、PostgreSQL 等資料庫全量和增量的一體化讀取能力,並藉助 Flink 優秀的管道能力和豐富的上下游生態,支援捕獲多種資料庫的變更,並將這些變更實時同步到下游儲存。
這裡也簡單說明下,CDC為三個英文Change Data Capture(變更資料捕獲)的縮寫,核心思想是監測並捕獲資料庫的變動(包括資料或資料表的插入、更新以及刪除等),將這些變更按發生的順序完整記錄下來,寫入到訊息中介軟體中以供其它服務進行訂閱及消費。
CDC主要分為基於查詢的CDC和基於binlog的CDC,兩者之間區別主要如下:
CDC Connectors for Apache Flink支援從多種資料庫到Flink攝取快照資料和實時更改,然後轉換和下沉到各種下游系統
支撐資料來源包括如下:
這裡以MySQL作為資料來源為例,通過flink-connector-mysql-cdc實現資料變更獲取,先準備MySQL環境,這裡複用前面<<實時採集MySQL資料之輕量工具Maxwell實操>>的文章環境,資料庫有兩個my_maxwell_01,my_maxwell_02,每個資料庫都有相同account和product表。pom檔案引入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itxs.flink</groupId>
<artifactId>flink-cdc-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.17.1</flink.version>
<flink.cdc.version>2.4.0</flink.cdc.version>
<mysql.client.version>8.0.29</mysql.client.version>
<fastjson.version>1.2.83</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.client.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flink.cdc.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
建立DataStreamDemo.java,
package cn.itxs.cdc;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataStreamDemo {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("mysqlserver")
.port(3306)
.databaseList("my_maxwell_01,my_maxwell_02")
.tableList("my_maxwell_01.*,my_maxwell_02.product")
.username("root")
.password("12345678")
.deserializer(new JsonDebeziumDeserializationSchema()) // 將SourceRecord轉換為JSON字串
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟checkpoint
env.enableCheckpointing(3000);
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// 設定平行度為4
.setParallelism(4)
.print().setParallelism(1); // 對sink列印使用並行性1來保持訊息順序
env.execute("Print MySQL Snapshot + Binlog");
}
}
由於上面flink的依賴設定
啟動程式,檢視紀錄檔可以看到從mysql讀取目前全量的資料,my_maxwell_02也唯讀取product表資料
修改兩個庫的表後可以看到相應修改資訊,其中也確認my_maxwell_02的account沒有讀取變更資料。
{"before":{"id":7,"name":"李丹","age":44},"after":{"id":7,"name":"李丹","age":48},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856595000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"account","server_id":1,"gtid":null,"file":"binlog.000025","pos":2798,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856598620,"transaction":null}
{"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856605000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3140,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856608748,"transaction":null}
{"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856628000,"snapshot":"false","db":"my_maxwell_02","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3486,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856631643,"transaction":null}
打包後放到叢集上,執行
bin/flink run -m hadoop1:8081 -c cn.itxs.cdc.DataStreamDemo ./lib/flink-cdc-demo-1.0-SNAPSHOT.jar
可以看到的紀錄檔也成功輸出表的全量的紀錄檔和剛才修改增量資料
如果需要斷點續傳可以使用狀態後端儲存來實現
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage("hdfs://hadoop111:9000/checkpoints/flink/cdc");
checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));
checkpointConfig.setTolerableCheckpointFailureNumber(5);
checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
建立SqlDemo.java檔案
package cn.itxs.cdc;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class SqlDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE account (\n" +
" id INT NOT NULL,\n" +
" name STRING,\n" +
" age INT,\n" +
" PRIMARY KEY(id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = 'mysqlserver',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '12345678',\n" +
" 'database-name' = 'my_maxwell_01',\n" +
" 'table-name' = 'account'\n" +
");");
Table table = tableEnv.sqlQuery("select * from account");
DataStream<Row> rowDataStream = tableEnv.toChangelogStream(table);
rowDataStream.print("account_binlog====");
env.execute();
}
}
啟動程式,檢視紀錄檔可以看到從mysql讀取my_maxwell_01庫account表的全量的資料,修改表資料也確認讀取變更資料。