在某些業務場景下,我們需要一個標誌來衡量hudi資料寫入的進度,比如:Flink 實時向 Hudi 表寫入資料,然後使用這個 Hudi 表來支援批次計算並通過一個 flag 來評估它的分割區資料是否完整從而進一步寫入分割區資料進行分割區級別的ETL,這也就是我們通常說的流轉批
。
圖中Flink Sink包含了兩個運算元。第一個writer 運算元,它負責把資料寫入檔案,writer在checkpoint觸發時,會把自己寫入的最大的一個時間傳到commit運算元中,然後commit運算元從多個上游傳過來的時間中選取一個最小值作為這一批提交資料的時間,並寫入HUDI表的後設資料中。
我們的方案是將這個進度值(EventTime)儲存為 hudi 提交(版本)後設資料的屬性裡,然後通過存取這個後設資料屬性獲取這個進度值。在下游的批次處理任務之前加一個監控任務去監控最新快照後設資料。如果它的時間已經超過了當前的分割區時間,就認為這個表的資料已經完備了,這個監控任務就會成功觸發下游的批次處理任務進行計算,這樣可以防止在異常場景下資料管道或者批次處理任務空跑的情況。
下圖是一個flink 1分鐘級別入庫到HUDI ODS表, 然後通過流轉批計算寫入HUDI DWD表的一個執行過程。
如何解決亂序到來問題, 我們可以通過設定spedGapTime來設定允許延遲到來的範圍預設是0 不會延遲到來。
針對此功能特性的Hudi依賴版本如下
<dependencies>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.13-bundle</artifactId>
<version>0.12.1</version>
</dependency>
</dependencies>
<dependencies>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink1.15-bundle</artifactId>
<version>0.12.1</version>
</dependency>
</dependencies>
能夠解析的欄位型別及格式如下:
型別 | 範例 |
---|---|
TIMESTAMP(3) | 2012-12-12T12:12:12 |
TIMESTAMP(3) | 2012-12-12 12:12:12 |
DATE | 2012-12-12 |
BIGINT | 100L |
INT | 100 |
使用者只需要設定flink conf指定時間欄位作為時間推進欄位
Map<String, String> options = new HashMap<>();
// 這裡省略其他表欄位
options.put(FlinkOptions.EVENT_TIME_FIELD.key(), "ts");
HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
.column("id int not null")
.column("ts string")
.column("dt string")
.pk("id")
.partition("dt")
.options(options);
通過設定hoodie.payload.event.time.field
指定需要計算的eventtime的欄位
create table hudi_cow_01(\n" +
" uuid varchar(20),\n" +
" name varchar(10),\n" +
" age int,\n" +
" ts timestamp(3),\n" +
" PRIMARY KEY(uuid) NOT ENFORCED\n" +
")\n" +
" with (\n" +
// 這裡省略其他引數
" 'hoodie.payload.event.time.field' = 'ts'\n"
")
call show_commit_extra_metadata(table => 'hudi_tauth_test.hudi_cow_01', metadata_key => 'hoodie.payload.event.time.field');
程式碼獲取片段如下
Option<HoodieCommitMetadata> commitMetadataOption = MetadataConversionUtils.getHoodieCommitMetadata(metaClient, currentInstant);
if (!commitMetadataOption.isPresent()) {
throw new HoodieException(String.format("Commit %s not found commitMetadata in Commits %s.", currentInstant, timeline));
}
// 獲取到當前版本的時間進度
String eventTime = commitMetadataOption.get().getExtraMetadata().get(FlinkOptions.EVENT_TIME_FIELD.key());
System.out.println("current eventTime: " + eventTime);
輸出結果如下
current eventTime: 1667971364742
PS:如果您覺得閱讀本文對您有幫助,請點一下「推薦」按鈕,您的「推薦」,將會是我不竭的動力!
作者:leesf 掌控之中,才會成功;掌控之外,註定失敗。
出處:http://www.cnblogs.com/leesf456/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連線,否則保留追究法律責任的權利。
如果覺得本文對您有幫助,您可以請我喝杯咖啡!