Apache Hudi 流轉批 場景實踐

2023-02-19 12:00:37

背景

在某些業務場景下,我們需要一個標誌來衡量hudi資料寫入的進度,比如:Flink 實時向 Hudi 表寫入資料,然後使用這個 Hudi 表來支援批次計算並通過一個 flag 來評估它的分割區資料是否完整從而進一步寫入分割區資料進行分割區級別的ETL,這也就是我們通常說的流轉批

EventTime計算原理

圖中Flink Sink包含了兩個運算元。第一個writer 運算元,它負責把資料寫入檔案,writer在checkpoint觸發時,會把自己寫入的最大的一個時間傳到commit運算元中,然後commit運算元從多個上游傳過來的時間中選取一個最小值作為這一批提交資料的時間,並寫入HUDI表的後設資料中。

案例使用

我們的方案是將這個進度值(EventTime)儲存為 hudi 提交(版本)後設資料的屬性裡,然後通過存取這個後設資料屬性獲取這個進度值。在下游的批次處理任務之前加一個監控任務去監控最新快照後設資料。如果它的時間已經超過了當前的分割區時間,就認為這個表的資料已經完備了,這個監控任務就會成功觸發下游的批次處理任務進行計算,這樣可以防止在異常場景下資料管道或者批次處理任務空跑的情況。

下圖是一個flink 1分鐘級別入庫到HUDI ODS表, 然後通過流轉批計算寫入HUDI DWD表的一個執行過程。

US排程系統輪詢邏輯

如何解決亂序到來問題,  我們可以通過設定spedGapTime來設定允許延遲到來的範圍預設是0 不會延遲到來。

Maven pom 依賴

針對此功能特性的Hudi依賴版本如下


<dependencies>
  <dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-flink1.13-bundle</artifactId>
    <version>0.12.1</version>
  </dependency>
</dependencies>

<dependencies>
  <dependency>
    <groupId>org.apache.hudi</groupId>
    <artifactId>hudi-flink1.15-bundle</artifactId>
    <version>0.12.1</version>
  </dependency>
</dependencies>

如何設定EventTime

能夠解析的欄位型別及格式如下:

型別 範例
TIMESTAMP(3) 2012-12-12T12:12:12
TIMESTAMP(3) 2012-12-12 12:12:12
DATE 2012-12-12
BIGINT 100L
INT 100

使用者只需要設定flink conf指定時間欄位作為時間推進欄位

Map<String, String> options = new HashMap<>();
// 這裡省略其他表欄位
options.put(FlinkOptions.EVENT_TIME_FIELD.key(), "ts");
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
     .column("id int not null")
     .column("ts string")
     .column("dt string")
     .pk("id")
     .partition("dt")
     .options(options);

通過設定hoodie.payload.event.time.field指定需要計算的eventtime的欄位

create table hudi_cow_01(\n" +
"  uuid varchar(20),\n" +
"  name varchar(10),\n" +
"  age int,\n" +
"  ts timestamp(3),\n" +
"  PRIMARY KEY(uuid) NOT ENFORCED\n" +
")\n" +
" with (\n" +
 // 這裡省略其他引數
"  'hoodie.payload.event.time.field' = 'ts'\n"
")

如何讀取EventTime

Spark SQL

call show_commit_extra_metadata(table => 'hudi_tauth_test.hudi_cow_01', metadata_key => 'hoodie.payload.event.time.field');

Java API

程式碼獲取片段如下

Option<HoodieCommitMetadata> commitMetadataOption = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, currentInstant);
if (!commitMetadataOption.isPresent()) {
    throw new HoodieException(String.format("Commit %s not found commitMetadata in Commits %s.", currentInstant, timeline));
}
// 獲取到當前版本的時間進度
String eventTime = commitMetadataOption.get().getExtraMetadata().get(FlinkOptions.EVENT_TIME_FIELD.key());
System.out.println("current eventTime: " + eventTime);

輸出結果如下

current eventTime: 1667971364742