FlinkSQL 之亂序問題

2022-06-10 06:06:56

亂序問題

在業務編寫 FlinkSQL 時, 非常常見的就是亂序相關問題, 在出現問題時,非常難以排查,且無法穩定復現,這樣無論是業務方,還是平臺方,都處於一種非常尷尬的地步。

亂序問題

在業務編寫 FlinkSQL 時, 非常常見的就是亂序相關問題, 在出現問題時,非常難以排查,且無法穩定復現,這樣無論是業務方,還是平臺方,都處於一種非常尷尬的地步。

在實時 join 中, 如果是 Regular Join, 則使用的是 Hash Join 方式, 左表和右表根據 Join Key 進行hash,保證具有相同 Join Key 的資料能夠 Hash 到同一個並行,進行 join 的計算 。
在實時聚合中, 主要普通的 group window, over window, time window 這幾中開窗方式,都涉及到 task 和 task 之間 hash 方式進行資料傳輸。
因此, 在比較複雜的邏輯中, 一條資料在整個資料流中需要進行不同的 hash 方式, 特別時當我們處理 CDC 資料時, 一定要要求資料嚴格有序, 否則可能會導致產生錯誤的結果。

以下面的例子進行說明, 以下有三張表, 分別是訂單表, 訂單明細表, 和商品類目 。

  • 這三張表的實時資料都從 MySQL 採集得到並實時寫入 Kafka, 均會實時發生變化, 無法使用視窗計算
  • 除了訂單表有訂單時間, 其他兩張表都沒有時間屬性, 因此無法使用watermark

CREATE TABLE orders (
	order_id VARCHAR,
	order_time TIMESTAMP
) WITH (
	'connector' = 'kafka',
	'format' = 'changelog-json'
	...
);

CREATE TABLE order_item (
	order_id VARCHAR,
	item_id VARCHAR
) WITH (
	'connector' = 'kafka',)
	'format' = 'changelog-json'
	...
);

CREATE TABLE item_detail (
	item_id VARCHAR,
	item_name VARCHAR,
	item_price BIGINT
) WITH (
	'connector' = 'kafka',
	'format' = 'changelog-json'
	...
);

使用 Regular Join 進行多路 Join,資料表打寬操作如下所示


SELECT o.order_id, i.item_id, d.item_name, d.item_price, o.order_time
FROM orders o
LEFT JOIN order_item i ON o.order_id = i.order_id
LEFT JOIN item_detail d ON i.item_id = d.item_id

最終生成的 DAG 圖如下所示:

可以發現:
第一個 join (後面統一簡稱為ijoin1)的條件是 order_id,該 join 的兩個輸入會以 order_id 進行hash,具有相同 order_id 的資料能夠被傳送到同一個 subtask

第二個 join (後面統一簡稱為 join2)的條件則是 item_id, 該 join 的兩個輸入會以 item_id 進行hash,具有相同 item_id 的資料則會被傳送到同一個 subtask.

正常情況下, 具有相同 order_id 的資料, 一定具有相同的 item_id,但由於上面的範例程式碼中,我們使用的是 left join 的寫法, 即使沒有 join 上, 也會輸出為 null 的資料,這樣可能導致了最終結果的不確定性。

以下面的資料為範例,再詳細說明一下:

TABLE orders

order_id order_time
id_001 2022-06-03 00:00:00

TABLE order_item

order_id item_id
id_001 item_001

TABLE item_detail

item_id item_name item_price
item_001 類目1 10

輸出資料如下:
1) 表示輸出資料的並行
+I 表示資料的屬性 (+I, , -D, -U, +U)
第一個 JOIN 輸出

1) +I(id_001, null, 2022-06-03 00:00:00)
1) -D(id_001, null, 2022-06-03 00:00:00)
1) +I(id_001, item_001, 2022-06-03 00:00:00)

第二個 JOIN 輸出

1) +I(id_001, null, null, null, 2022-06-03 00:00:00)
1) -D(id_001, null, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)

以上結果只是上述作業可能出現的情況之一,實際執行時並不一定會出現。 我們可以發現 join1 結果傳送到 join2 之後, 相同的 order_id 並不一定會傳送到同一個 subtask,因此當資料經過了 join2, 相同的 order_id 的資料會落到不同的並行,這樣在後續的資料處理中, 有非常大的概率會導致最終結果的不確定性。

我們再細分以下場景考慮, 假設經過 join2 之後的結果為 join_view:

  1. 假設 join2 之後,我們基於 item_id 進行聚合, 統計相同類目的訂單數
SELECT item_id, sum(order_id)
FROM join_view
GROUP BY item_id

很顯然, 上述的亂序問題並不會影響這段邏輯的結果, item_id 為 null 的資料會進行計算, 但並不會影響 item_id 為 item_001 的結果.

  1. 假設 join2 之後, 我們將結果直接寫入 MySQL, MySQL 主鍵為 order_id
CREATE TABLE MySQL_Sink (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id) NOT ENFORCED
) with (
	'connector' = 'jdbc'
);

INSERT INTO MySQL_Sink SELECT * FROM JOIN_VIEW;

由於我們在 Sink connector 中未單獨設定並行, 因此 sink 的並行度是和 join2 的並行是一樣的, 因此 join2 的輸出會直接傳送給 sink 運算元, 並寫入到 MySQL 中。
由於是不同並行同時在寫 MySQL ,所以實際寫 MySQL的順序可能如下所示:

2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)
1) +I(id_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, null, null, 2022-06-03 00:00:00)

很顯然, 最終結果會是 為 空, 最終寫入的是一條 delete 資料

  1. 假設 join2 之後, 我們將結果直接寫入 MySQL, 主鍵為 order_id, item_id, item_name
CREATE TABLE MySQL_Sink (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id) NOT ENFORCED
) with (
	'connector' = 'jdbc'
);

INSERT INTO MySQL_Sink SELECT * FROM JOIN_VIEW;

和範例2一樣, 我們未單獨設定 sink 的並行, 因此資料會之間傳送到 sink 運算元, 假設寫入 MySQL 的順序和範例2一樣:

2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)
1) +I(id_001, null, null, null, 2022-06-03 00:00:00)
1) -D(id_001, null, null, null, 2022-06-03 00:00:00)

最終結果會是

2) +I(id_001, item_001, 類目1, 2022-06-03 00:00:00)

由於 MySQL 的主鍵是 order_id, item_id, item_name 所以最後的 -D 記錄並不會刪除 subtask 2 寫入的資料, 這樣最終的結果是正確的。

  1. 假設 join2 之後, 我們將結果寫入 kafka,寫入格式為 changelog-json , 下游作業消費 kafka 並進行處理
CREATE TABLE kafka_sink (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id, item_id, item_name) NOT ENFORCED
) with (
	'connector' = 'kafka',
	'format' = 'changelog-json',
	'topic' = 'join_result'
);

INSERT INTO kafka_sink select * from JOIN_VIEW;

預設,如果不設定 partitioner, kafka sink 會以我們在 DDL 中設定的主鍵生成對應的 hash key, 用於通過 hash 值生成 partition id。
有一點我們需要注意, 由於 join2 的輸出已經在不同的並行了, 所以無論 kafka_sink 選擇以 order_id 作為唯一的主鍵, 還是以 order_id, item_id, item_name 作為主鍵, 我們都無法控制不同並行寫入 kafka 的順序, 我們只能確保相同的並行的資料能夠有序的被寫入 kafka 的同一 partition 。

  • 如果設定 order_id 為主鍵, 我們可以保證上述的所有資料能夠被寫入同一個 partition
  • 如果設定 order_id, item_id, item_name 則上面不同並行的輸出可能會被寫入到不同的 partition

所以,我們需要關注的是, 當資料寫入 kafka 之後, 下游怎麼去處理這一份資料:

  1. 基於 order_id 進行去重,並按天聚合,計算當天的累加值。
    以下面的 SQL 為例, 下游在消費 kafka 時, 為了避免資料重複, 先基於 order_id 做了一次去重, 用 order_id 作為分割區條件, 基於proctime() 進行去重 (增加table.exec.source.cdc-events-duplicate 該引數, 框架會自動生成去重運算元).
-- https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/
set 'table.exec.source.cdc-events-duplicate'='true';

CREATE TABLE kafka_source (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id) NOT ENFORCED
) with (
	'connector' = 'kafka',
	'format' = 'changelog-json',
	'topic' = 'join_result'
);

-- 按order_time 聚合, 計算每天的營收

SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd'), sum(item_price)
FROM kafka_source
GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')

結果上述的計算,我們預計結果會如何輸出:
去重之後可能的輸出為:

1) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)
1) -D(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)
1) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)

經過聚合運算元運算元:

1) +I(2022-06-03, null)
1) -D(2022-06-03, null)
1) +I(2022-06-03, 10)
1) -D(2022-06-03, 10)
1) +I(2022-06-03, 0)
1) -D(2022-06-03, 0)
1) +I(2022-06-03, null)
1) -D(2022-06-03, null)

可以發現,最終輸出結果為2022-06-03, null,本文列舉的範例不夠完善, 正常情況下, 當天肯定會有其他的記錄, 結果當天的結果可能不會為 null, 但我們可以知道的是,由於資料的亂序, 資料和實際結果已經不準確了。
2) 基於order_id, item_id, item_name 去重,之後按天聚合,計算當天的累加值。

-- https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/
set 'table.exec.source.cdc-events-duplicate'='true';

CREATE TABLE kafka_source (
	order_id VARCHAR,
	item_id VARCHAR,
	item_name VARCHAR,
	item_price INT,
	order_time TIMESTAMP,
	PRIMARY KEY (order_id, item_id, item_name) NOT ENFORCED
) with (
	'connector' = 'kafka',
	'format' = 'changelog-json',
	'topic' = 'join_result'
);

-- 按order_time 聚合, 計算每天的營收

SELECT DATE_FORMAT(order_time, 'yyyy-MM-dd'), sum(item_price)
FROM kafka_source
GROUP BY DATE_FORMAT(order_time, 'yyyy-MM-dd')

去重之後的輸出:

1) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00)
2) +I(id_001, item_001, null, null, 2022-06-03 00:00:00)
2) -D(id_001, item_001, null, null, 2022-06-03 00:00:00)

由於我們主鍵設定的是 order_id, item_id, item_name 所以
(id_001, item_001, null, null, 2022-06-03 00:00:00)
1) +I(id_001, item_001, 類目1, 10, 2022-06-03 00:00:00) 是不同的主鍵, 所以並不會互相影響。
經過聚合之後的結果:

1) +I(2022-06-03, null)
1) -D(2022-06-03, null)
1) +I(2022-06-03, 10)
1) -D(2022-06-03, 10)
1) +I(2022-06-03, 10)
1) -D(2022-06-03, 10)
1) +I(2022-06-03, 10)

以上就是最終結果的輸出, 可以發現我們最終的結果是沒有問題的。

原因分析

下圖是原始作業的資料流轉變化情況

graph LR orders(orders) --> |hash:order_id| join1(join1) order_item(order_item) -->|hash:order_id| join1 join1 --> |hash:item_id| join2(join2) item_detail(item_detail) --> |hash:item_id| join2
  • A 基於 item_id 聚合, 計算相同類目的訂單數 (結果正確)
graph LR join2(join2) --> |hash:item_id| group(group count:order_id)
  • B 將join的資料 sink 至 MySQL (主鍵為 order_id) (結果錯誤)
graph LR join2(join2) --> |forward| sink(sink) sink --> |jdbc send| MySQL(MySQL primarykey:order_id)
  • C 將join的資料 sink 至 MySQL (主鍵為 order_id, item_id, item_name) (結果正確)
graph LR join2(join2) --> |forward| sink(sink) sink --> |jdbc send| MySQL(MySQL primarykey:order_id)
  • D 將 join 的資料 sink 至 kafka, 下游消費 kafka 資料並進行去重處理, 下游處理時,又可以分為兩種情況。
    • D-1 按 order_id 分割區並去重 (結果錯誤)
    • D-2 按 order_id, item_id, item_name 分割區並去重 (結果正確)
graph TD join2(join2) --> |forward| sink(sink) sink --> |kafka client| kafka(Kafka fixed partitioner) kafka --> |hash:order_id| rank(rank orderby:proctime) rank --> |"hash:date_format(order_time, 'yyyy-MM-dd')"| group("group agg:sum(item_price)")

graph TD join2(join2) --> |forward| sink(sink key) sink --> |kafka client| kafka(Kafka fixed partitioner) kafka --> |hash:order_id+item_id+item_name| rank(rank orderby:proctime) rank --> |"hash:date_format(order_time, 'yyyy-MM-dd')"| group("group agg:sum(item_price)")

從上面 A, B, C, D-1, D-2 這四種 case, 我們不難發現, 什麼情況下會導致錯誤的結果, 什麼情況下不會導致錯誤的結果, 關鍵還是要看每個 task 之間的 hash 規則。

case B 產生亂序主要原因時在 sink operator, hash的條件由原來的 order_id+item_id_item_name 變成了 order_id
case D-1 產生亂序主要發生在去重的 operator, hash 的規則由原來的 order_id+item_id+item_name 變為了 order_id

我們大概能總結以下幾點經驗

  • Flink 框架在可以保證 operator 和 operator hash 時, 一定是可以保證具有相同 hash 值的資料的在兩個 operator 之間傳輸順序性
  • Flink 框架無法保證資料連續多個 operator hash 的順序, 當 operator 和 operator 之間的 hash 條件發生變化, 則有可能出現資料的順序性問題。
  • 當 hash 條件由少變多時, 不會產生順序問題, 當 hash 條件由多變少時, 則可能會產生順序問題。

總結

大多數業務都是拿著原來的實時任務, 核心邏輯不變,只是把原來的 Hive 替換成 訊息佇列的 Source 表, 這樣跑出來的結果,一般情況下就很難和離線對上,雖然流批一體是 Flink 的優勢, 但對於某些 case , 實時的結果和離線的結果還是會產生差異, 因此我們在編寫 FlinkSQL 程式碼時, 一定要確保資料的準備性, 在編寫程式碼時,一定要知道我們的資料大概會產生怎樣的流動, 產生怎樣的結果, 這樣寫出來的邏輯才是符合預期的。