一文解開主流開源變更資料捕獲技術之Flink CDC的入門使用

2023-06-28 06:00:27

@

概述

定義

flink-cdc-connectors 官網 https://github.com/ververica/flink-cdc-connectors 原始碼release最新版本2.4.0

flink-cdc-connectors 檔案地址 https://ververica.github.io/flink-cdc-connectors/master/

flink-cdc-connectors 原始碼地址 https://github.com/ververica/flink-cdc-connectors

CDC Connectors for Apache Flink 是Apache Flink的一組源聯結器,使用更改資料捕獲(CDC)從不同的資料庫攝取更改,其整合了Debezium作為捕獲資料變化的引擎,因此它可以充分利用Debezium的能力。

Flink CDC是由Flink社群開發的flink-cdc-connectors 的source元件,基於資料庫紀錄檔的 Change Data Caputre 技術,實現了從 MySQL、PostgreSQL 等資料庫全量和增量的一體化讀取能力,並藉助 Flink 優秀的管道能力和豐富的上下游生態,支援捕獲多種資料庫的變更,並將這些變更實時同步到下游儲存。

什麼是CDC?

這裡也簡單說明下,CDC為三個英文Change Data Capture(變更資料捕獲)的縮寫,核心思想是監測並捕獲資料庫的變動(包括資料或資料表的插入、更新以及刪除等),將這些變更按發生的順序完整記錄下來,寫入到訊息中介軟體中以供其它服務進行訂閱及消費。

CDC的分類

CDC主要分為基於查詢的CDC和基於binlog的CDC,兩者之間區別主要如下:

特性

  • 支援讀取資料庫快照,即使發生故障,也只進行一次處理,繼續讀取事務紀錄檔。
  • 資料流API的CDC聯結器,使用者可以在單個作業中消費多個資料庫和表上的更改,而無需部署Debezium和Kafka。
  • 用於表/SQL API的CDC聯結器,使用者可以使用SQL DDL建立CDC源來監視單個表上的更改。

應用場景

  • 資料分發,將一個資料來源分發給多個下游,常用於業務解耦、微服務。
  • 資料整合,將分散異構的資料來源整合到資料倉儲中,消除資料孤島,便於後續的分析。
  • 資料遷移,常用於資料庫備份、容災等。

支援資料來源

CDC Connectors for Apache Flink支援從多種資料庫到Flink攝取快照資料和實時更改,然後轉換和下沉到各種下游系統

支撐資料來源包括如下:

實戰

這裡以MySQL作為資料來源為例,通過flink-connector-mysql-cdc實現資料變更獲取,先準備MySQL環境,這裡複用前面<<實時採集MySQL資料之輕量工具Maxwell實操>>的文章環境,資料庫有兩個my_maxwell_01,my_maxwell_02,每個資料庫都有相同account和product表。pom檔案引入依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itxs.flink</groupId>
    <artifactId>flink-cdc-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.17.1</flink.version>
        <flink.cdc.version>2.4.0</flink.cdc.version>
        <mysql.client.version>8.0.29</mysql.client.version>
        <fastjson.version>1.2.83</fastjson.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.client.version}</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink.cdc.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

建立DataStreamDemo.java,

package cn.itxs.cdc;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DataStreamDemo {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("mysqlserver")
                .port(3306)
                .databaseList("my_maxwell_01,my_maxwell_02")
                .tableList("my_maxwell_01.*,my_maxwell_02.product")
                .username("root")
                .password("12345678")
                .deserializer(new JsonDebeziumDeserializationSchema()) // 將SourceRecord轉換為JSON字串
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 開啟checkpoint
        env.enableCheckpointing(3000);

        env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // 設定平行度為4
                .setParallelism(4)
                .print().setParallelism(1); // 對sink列印使用並行性1來保持訊息順序

        env.execute("Print MySQL Snapshot + Binlog");
    }
}

由於上面flink的依賴設定provided,因此在IDEA中啟動的話需要勾選下面標紅的選項

啟動程式,檢視紀錄檔可以看到從mysql讀取目前全量的資料,my_maxwell_02也唯讀取product表資料

修改兩個庫的表後可以看到相應修改資訊,其中也確認my_maxwell_02的account沒有讀取變更資料。

{"before":{"id":7,"name":"李丹","age":44},"after":{"id":7,"name":"李丹","age":48},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856595000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"account","server_id":1,"gtid":null,"file":"binlog.000025","pos":2798,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856598620,"transaction":null}
{"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856605000,"snapshot":"false","db":"my_maxwell_01","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3140,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856608748,"transaction":null}
{"before":{"id":1,"name":"iphone13","type":1},"after":{"id":1,"name":"iphone14","type":1},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1687856628000,"snapshot":"false","db":"my_maxwell_02","sequence":null,"table":"product","server_id":1,"gtid":null,"file":"binlog.000025","pos":3486,"row":0,"thread":330184,"query":null},"op":"u","ts_ms":1687856631643,"transaction":null}

打包後放到叢集上,執行

bin/flink run -m hadoop1:8081 -c cn.itxs.cdc.DataStreamDemo ./lib/flink-cdc-demo-1.0-SNAPSHOT.jar 

可以看到的紀錄檔也成功輸出表的全量的紀錄檔和剛才修改增量資料

如果需要斷點續傳可以使用狀態後端儲存來實現

        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointStorage("hdfs://hadoop111:9000/checkpoints/flink/cdc");
        checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

FlinkSQL方式程式碼範例

建立SqlDemo.java檔案

package cn.itxs.cdc;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class SqlDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql("CREATE TABLE account (\n" +
                " id INT NOT NULL,\n" +
                " name STRING,\n" +
                " age INT,\n" +
                " PRIMARY KEY(id) NOT ENFORCED\n" +
                ") WITH (\n" +
                " 'connector' = 'mysql-cdc',\n" +
                " 'hostname' = 'mysqlserver',\n" +
                " 'port' = '3306',\n" +
                " 'username' = 'root',\n" +
                " 'password' = '12345678',\n" +
                " 'database-name' = 'my_maxwell_01',\n" +
                " 'table-name' = 'account'\n" +
                ");");

        Table table = tableEnv.sqlQuery("select * from account");
        DataStream<Row> rowDataStream = tableEnv.toChangelogStream(table);
        rowDataStream.print("account_binlog====");
        env.execute();
    }
}

啟動程式,檢視紀錄檔可以看到從mysql讀取my_maxwell_01庫account表的全量的資料,修改表資料也確認讀取變更資料。

  • 本人部落格網站IT小神 www.itxiaoshen.com