Flink測試利器之DataGen初探

2023-10-13 15:01:26

什麼是 Flinksql

Flink SQL 是基於 Apache Calcite 的 SQL 解析器和優化器構建的,支援ANSI SQL 標準,允許使用標準的 SQL 語句來處理流式和批次處理資料。通過 Flink SQL,可以以宣告式的方式描述資料處理邏輯,而無需編寫顯式的程式碼。使用 Flink SQL,可以執行各種資料操作,如過濾、聚合、連線和轉換等。它還提供了視窗操作、時間處理和複雜事件處理等功能,以滿足流式資料處理的需求。

Flink SQL 提供了許多擴充套件功能和語法,以適應 Flink 的流式和批次處理引擎的特性。他是Flink最高階別的抽象,可以與 DataStream API 和 DataSet API 無縫整合,利用 Flink 的分散式計算能力和容錯機制。

使用 Flink SQL處理資料的基本步驟:

  1. 定義輸入表:使用 CREATE TABLE 語句定義輸入表,指定表的模式(欄位和型別)和資料來源(如 Kafka、檔案等)。

  2. 執行 SQL 查詢:使用 SELECT、INSERT INTO 等 SQL 語句來執行資料查詢和操作。您可以在 SQL 查詢中使用各種內建函數、聚合操作、視窗操作和時間屬性等。

  3. 定義輸出表:使用 CREATE TABLE 語句定義輸出表,指定表的模式和目標資料儲存(如 Kafka、檔案等)。

  4. 提交作業:將 Flink SQL 查詢作為 Flink 作業提交到 Flink 叢集中執行。Flink會根據查詢的邏輯和設定自動構建執行計劃,並將資料處理任務分發到叢集中的工作管理員進行執行。

總而言之,我們可以通過Flink SQL 查詢和操作來處理流式和批次處理資料。它提供了一種簡化和加速資料處理開發的方式,尤其適用於熟悉 SQL 的開發人員和資料工程師。

什麼是 connector

Flink Connector 是指用於連線外部系統和資料來源的元件。它允許 Flink 通過特定的聯結器與不同的資料來源進行互動,例如資料庫、訊息佇列、檔案系統等。它負責處理與外部系統的通訊、資料格式轉換、資料讀取和寫入等任務。無論是作為輸入資料表還是輸出資料表,通過使用適當的聯結器,可以在 Flink SQL 中存取和操作外部系統中的資料。目前實時平臺提供了很多常用的聯結器:

例如:

  1. JDBC :用於與關係型資料庫(如 MySQL、PostgreSQL)建立連線,並支援在 Flink SQL 中讀取和寫入資料庫表的資料。

  2. JDQ :用於與 JDQ 整合,可以讀取和寫入 JDQ 主題中的資料。

  3. Elasticsearch :用於與 Elasticsearch 整合,可以將資料寫入 Elasticsearch 索引或從索引中讀取資料。

  4. File Connector:用於讀取和寫入各種檔案格式(如 CSV、JSON、Parquet)的資料。

  5. ......

還有如HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive等,用於與不同的資料來源進行整合。通過使用 Flink SQL Connector,我們可以輕鬆地與外部系統進行資料互動,將資料匯入到 Flink 進行處理,或將處理結果匯出到外部系統。

DataGen Connector

DataGen 是 Flink SQL 提供的一個內建聯結器,用於生成模擬的測試資料,以便在開發和測試過程中使用。

使用 DataGen,可以生成具有不同資料型別和分佈的資料,例如整數、字串、日期等。這樣可以模擬真實的資料場景,並幫助驗證和偵錯 Flink SQL 查詢和操作。

demo

以下是一個使用 DataGen 函數的簡單範例:

-- 建立輸入表
CREATE TABLE input_table (
 order_number BIGINT,
 price DECIMAL(32,2),
 buyer ROW<first_name STRING, last_name STRING>,
 order_time TIMESTAMP(3)
) WITH (
 'connector' = 'datagen',
);

在上面的範例中,我們使用 DataGen 聯結器建立了一個名為 `input_table` 的輸入表。該表包含了 `order_number`、`price` 和 `buyer` ,`order_time`四個欄位。預設是random隨機生成對應型別的資料,生產速率是10000條/秒,只要任務不停,就會源源不斷的生產資料。當然也可以指定一些引數來定義生成資料的規則,例如每秒生成的行數、欄位的資料型別和分佈。

生成的資料樣例:

{"order_number":-6353089831284155505,"price":253422671148527900374700392448,"buyer":{"first_name":"6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2","last_name":"d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"},"order_time":"2023-09-21 06:22:29.618"}
{"order_number":1102733628546646982,"price":628524591222898424803263250432,"buyer":{"first_name":"4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c","last_name":"7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"},"order_time":"2023-09-21 06:23:01.69"}

支援的型別

欄位型別 資料生成方式
BOOLEAN random
CHAR random / sequence
VARCHAR random / sequence
STRING random / sequence
DECIMAL random / sequence
TINYINT random / sequence
SMALLINT random / sequence
INT random / sequence
BIGINT random / sequence
FLOAT random / sequence
DOUBLE random / sequence
DATE random
TIME random
TIMESTAMP random
TIMESTAMP_LTZ random
INTERVAL YEAR TO MONTH random
INTERVAL DAY TO MONTH random
ROW random
ARRAY random
MAP random
MULTISET random

聯結器屬性

屬性 是否必填 預設值 型別 描述
connector required (none) String 'datagen'.
rows-per-second optional 10000 Long 資料生產速率
number-of-rows optional (none) Long 指定生產的資料條數,預設是不限制。
fields.#.kind optional random String 指定欄位的生產資料的方式 random還是sequence
fields.#.min optional (Minimum value of type) (Type of field) random生成器 指定欄位 # 最小值, 支援數位型別
fields.#.max optional (Maximum value of type) (Type of field) random生成器的指定欄位 # 最大值, 支援數位型別
fields.#.length optional 100 Integer char/varchar/string/array/map/multiset 型別的長度.
fields.#.start optional (none) (Type of field) sequence生成器的開始值
fields.#.end optional (none) (Type of field) sequence生成器的結束值

DataGen使用

瞭解了dategen的基本使用方法,那麼下面來結合其他型別的聯結器實踐一下吧。

場景1 生成一億條資料到hive表

CREATE TABLE dataGenSourceTable
 (
 order_number BIGINT,
 price DECIMAL(10, 2),
 buyer STRING,
 order_time TIMESTAMP(3)
 )
WITH
 ( 'connector'='datagen', 
 'number-of-rows'='100000000',
 'rows-per-second' = '100000'
 ) ;


CREATECATALOG myhive
WITH (
 'type'='hive',
 'default-database'='default'
);
USECATALOG myhive;
USE dev;
SETtable.sql-dialect=hive;
CREATETABLEifnotexists shipu3_test_0932 (
 order_number BIGINT,
 price DECIMAL(10, 2),
 buyer STRING,
 order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
 'partition.time-extractor.timestamp-pattern'='$dt',
 'sink.partition-commit.trigger'='partition-time',
 'sink.partition-commit.delay'='1 h',
 'sink.partition-commit.policy.kind'='metastore,success-file'
);
SETtable.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number,price,buyer,order_time, cast( CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;

當每秒生產10萬條資料的時候,17分鐘左右就可以完成,當然我們可以通過增加Flink任務的計算節點、並行度、提高生產速率'rows-per-second'的值等來更快速的完成巨量資料量的生產。

場景2 持續每秒生產10萬條數到訊息佇列

CREATE TABLE dataGenSourceTable (
 order_number BIGINT,
 price INT,
 buyer ROW< first_name STRING, last_name STRING >,
 order_time TIMESTAMP(3),
 col_array ARRAY < STRING >,
 col_map map < STRING, STRING >
 )
WITH
 ( 'connector'='datagen', --聯結器型別
 'rows-per-second'='100000', --生產速率
 'fields.order_number.kind'='random', --欄位order_number的生產方式
 'fields.order_number.min'='1', --欄位order_number最小值
 'fields.order_number.max'='1000', --欄位order_number最大值
 'fields.price.kind'='sequence', --欄位price的生產方式
 'fields.price.start'='1', --欄位price開始值
 'fields.price.end'='1000', --欄位price最大值
 'fields.col_array.element.length'='5', --每個元素的長度
 'fields.col_map.key.length'='5', --map key的長度
 'fields.col_map.value.length'='5' --map value的長度
 ) ;
CREATE TABLE jdqsink1
 (
 order_number BIGINT,
 price DECIMAL(32, 2),
 buyer ROW< first_name STRING, last_name STRING >,
 order_time TIMESTAMP(3),
 col_ARRAY ARRAY < STRING >,
 col_map map < STRING, STRING >
 )
WITH
 (
 'connector'='jdq',
 'topic'='jrdw-fk-area_info__1',
 'jdq.client.id'='xxxxx',
 'jdq.password'='xxxxxxx',
 'jdq.domain'='db.test.group.com',
 'format'='json'
 ) ;
INSERTINTO jdqsink1
SELECT*FROM dataGenSourceTable;

思考

通過以上案例可以看到,通過Datagen結合其他聯結器可以模擬各種場景的資料

  • 效能測試:我們可以利用Flink的高處理效能,來偵錯任務的外部依賴的閾值(超時,限流等)到一個合適的水位,避免自己的任務有過多的外部依賴出現木桶效應;
  • 邊界條件測試:我們通過使用 Flink DataGen 生成特殊的測試資料,如最小值、最大值、空值、重複值等來驗證 Flink 任務在邊界條件下的正確性和魯棒性;
  • 資料完整性測試:我們通過Flink DataGen 可以生成包含錯誤或異常資料的資料集,如無效的資料格式、缺失的欄位、重複的資料等。從而可以測試 Flink 任務對異常情況的處理能力,驗證 Flink任務在處理資料時是否能夠正確地保持資料的完整性。

總之,Flink DataGen 是一個強大的工具,可以幫助測試人員構造各種型別的測試資料。通過合理的使用 ,測試人員可以更有效地進行測試,並行現潛在的問題和缺陷。

作者:京東零售 石樸

來源:京東雲開發者社群 轉載請註明來源