Flink SQL 是基於 Apache Calcite 的 SQL 解析器和優化器構建的,支援ANSI SQL 標準,允許使用標準的 SQL 語句來處理流式和批次處理資料。通過 Flink SQL,可以以宣告式的方式描述資料處理邏輯,而無需編寫顯式的程式碼。使用 Flink SQL,可以執行各種資料操作,如過濾、聚合、連線和轉換等。它還提供了視窗操作、時間處理和複雜事件處理等功能,以滿足流式資料處理的需求。
Flink SQL 提供了許多擴充套件功能和語法,以適應 Flink 的流式和批次處理引擎的特性。他是Flink最高階別的抽象,可以與 DataStream API 和 DataSet API 無縫整合,利用 Flink 的分散式計算能力和容錯機制。
使用 Flink SQL處理資料的基本步驟:
定義輸入表:使用 CREATE TABLE 語句定義輸入表,指定表的模式(欄位和型別)和資料來源(如 Kafka、檔案等)。
執行 SQL 查詢:使用 SELECT、INSERT INTO 等 SQL 語句來執行資料查詢和操作。您可以在 SQL 查詢中使用各種內建函數、聚合操作、視窗操作和時間屬性等。
定義輸出表:使用 CREATE TABLE 語句定義輸出表,指定表的模式和目標資料儲存(如 Kafka、檔案等)。
提交作業:將 Flink SQL 查詢作為 Flink 作業提交到 Flink 叢集中執行。Flink會根據查詢的邏輯和設定自動構建執行計劃,並將資料處理任務分發到叢集中的工作管理員進行執行。
總而言之,我們可以通過Flink SQL 查詢和操作來處理流式和批次處理資料。它提供了一種簡化和加速資料處理開發的方式,尤其適用於熟悉 SQL 的開發人員和資料工程師。
Flink Connector 是指用於連線外部系統和資料來源的元件。它允許 Flink 通過特定的聯結器與不同的資料來源進行互動,例如資料庫、訊息佇列、檔案系統等。它負責處理與外部系統的通訊、資料格式轉換、資料讀取和寫入等任務。無論是作為輸入資料表還是輸出資料表,通過使用適當的聯結器,可以在 Flink SQL 中存取和操作外部系統中的資料。目前實時平臺提供了很多常用的聯結器:
例如:
JDBC :用於與關係型資料庫(如 MySQL、PostgreSQL)建立連線,並支援在 Flink SQL 中讀取和寫入資料庫表的資料。
JDQ :用於與 JDQ 整合,可以讀取和寫入 JDQ 主題中的資料。
Elasticsearch :用於與 Elasticsearch 整合,可以將資料寫入 Elasticsearch 索引或從索引中讀取資料。
File Connector:用於讀取和寫入各種檔案格式(如 CSV、JSON、Parquet)的資料。
......
還有如HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive等,用於與不同的資料來源進行整合。通過使用 Flink SQL Connector,我們可以輕鬆地與外部系統進行資料互動,將資料匯入到 Flink 進行處理,或將處理結果匯出到外部系統。
DataGen 是 Flink SQL 提供的一個內建聯結器,用於生成模擬的測試資料,以便在開發和測試過程中使用。
使用 DataGen,可以生成具有不同資料型別和分佈的資料,例如整數、字串、日期等。這樣可以模擬真實的資料場景,並幫助驗證和偵錯 Flink SQL 查詢和操作。
以下是一個使用 DataGen 函數的簡單範例:
-- 建立輸入表
CREATE TABLE input_table (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW<first_name STRING, last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
);
在上面的範例中,我們使用 DataGen 聯結器建立了一個名為 `input_table` 的輸入表。該表包含了 `order_number`、`price` 和 `buyer` ,`order_time`四個欄位。預設是random隨機生成對應型別的資料,生產速率是10000條/秒,只要任務不停,就會源源不斷的生產資料。當然也可以指定一些引數來定義生成資料的規則,例如每秒生成的行數、欄位的資料型別和分佈。
生成的資料樣例:
{"order_number":-6353089831284155505,"price":253422671148527900374700392448,"buyer":{"first_name":"6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2","last_name":"d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"},"order_time":"2023-09-21 06:22:29.618"}
{"order_number":1102733628546646982,"price":628524591222898424803263250432,"buyer":{"first_name":"4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c","last_name":"7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"},"order_time":"2023-09-21 06:23:01.69"}
欄位型別 | 資料生成方式 |
---|---|
BOOLEAN | random |
CHAR | random / sequence |
VARCHAR | random / sequence |
STRING | random / sequence |
DECIMAL | random / sequence |
TINYINT | random / sequence |
SMALLINT | random / sequence |
INT | random / sequence |
BIGINT | random / sequence |
FLOAT | random / sequence |
DOUBLE | random / sequence |
DATE | random |
TIME | random |
TIMESTAMP | random |
TIMESTAMP_LTZ | random |
INTERVAL YEAR TO MONTH | random |
INTERVAL DAY TO MONTH | random |
ROW | random |
ARRAY | random |
MAP | random |
MULTISET | random |
屬性 | 是否必填 | 預設值 | 型別 | 描述 |
---|---|---|---|---|
connector | required | (none) | String | 'datagen'. |
rows-per-second | optional | 10000 | Long | 資料生產速率 |
number-of-rows | optional | (none) | Long | 指定生產的資料條數,預設是不限制。 |
fields.#.kind | optional | random | String | 指定欄位的生產資料的方式 random還是sequence |
fields.#.min | optional | (Minimum value of type) | (Type of field) | random生成器 指定欄位 # 最小值, 支援數位型別 |
fields.#.max | optional | (Maximum value of type) | (Type of field) | random生成器的指定欄位 # 最大值, 支援數位型別 |
fields.#.length | optional | 100 | Integer | char/varchar/string/array/map/multiset 型別的長度. |
fields.#.start | optional | (none) | (Type of field) | sequence生成器的開始值 |
fields.#.end | optional | (none) | (Type of field) | sequence生成器的結束值 |
瞭解了dategen的基本使用方法,那麼下面來結合其他型別的聯結器實踐一下吧。
CREATE TABLE dataGenSourceTable
(
order_number BIGINT,
price DECIMAL(10, 2),
buyer STRING,
order_time TIMESTAMP(3)
)
WITH
( 'connector'='datagen',
'number-of-rows'='100000000',
'rows-per-second' = '100000'
) ;
CREATECATALOG myhive
WITH (
'type'='hive',
'default-database'='default'
);
USECATALOG myhive;
USE dev;
SETtable.sql-dialect=hive;
CREATETABLEifnotexists shipu3_test_0932 (
order_number BIGINT,
price DECIMAL(10, 2),
buyer STRING,
order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
SETtable.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number,price,buyer,order_time, cast( CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;
當每秒生產10萬條資料的時候,17分鐘左右就可以完成,當然我們可以通過增加Flink任務的計算節點、並行度、提高生產速率'rows-per-second'的值等來更快速的完成巨量資料量的生產。
CREATE TABLE dataGenSourceTable (
order_number BIGINT,
price INT,
buyer ROW< first_name STRING, last_name STRING >,
order_time TIMESTAMP(3),
col_array ARRAY < STRING >,
col_map map < STRING, STRING >
)
WITH
( 'connector'='datagen', --聯結器型別
'rows-per-second'='100000', --生產速率
'fields.order_number.kind'='random', --欄位order_number的生產方式
'fields.order_number.min'='1', --欄位order_number最小值
'fields.order_number.max'='1000', --欄位order_number最大值
'fields.price.kind'='sequence', --欄位price的生產方式
'fields.price.start'='1', --欄位price開始值
'fields.price.end'='1000', --欄位price最大值
'fields.col_array.element.length'='5', --每個元素的長度
'fields.col_map.key.length'='5', --map key的長度
'fields.col_map.value.length'='5' --map value的長度
) ;
CREATE TABLE jdqsink1
(
order_number BIGINT,
price DECIMAL(32, 2),
buyer ROW< first_name STRING, last_name STRING >,
order_time TIMESTAMP(3),
col_ARRAY ARRAY < STRING >,
col_map map < STRING, STRING >
)
WITH
(
'connector'='jdq',
'topic'='jrdw-fk-area_info__1',
'jdq.client.id'='xxxxx',
'jdq.password'='xxxxxxx',
'jdq.domain'='db.test.group.com',
'format'='json'
) ;
INSERTINTO jdqsink1
SELECT*FROM dataGenSourceTable;
通過以上案例可以看到,通過Datagen結合其他聯結器可以模擬各種場景的資料
總之,Flink DataGen 是一個強大的工具,可以幫助測試人員構造各種型別的測試資料。通過合理的使用 ,測試人員可以更有效地進行測試,並行現潛在的問題和缺陷。
作者:京東零售 石樸
來源:京東雲開發者社群 轉載請註明來源