Hudi 在 vivo 湖倉一體的落地實踐

2023-12-14 12:00:38

作者:vivo 網際網路巨量資料團隊 - Xu Yu

在增效降本的大背景下,vivo巨量資料基礎團隊引入Hudi元件為公司業務部門湖倉加速的場景進行賦能。主要應用在流批同源、實時鏈路優化及寬表拼接等業務場景。

一、Hudi 基礎能力及相關概念介紹

1.1 流批同源能力

與Hive不同,Hudi資料在Spark/Flink寫入後,下游可以繼續使用Spark/Flink引擎以流讀的形式實時讀取資料。同一份Hudi資料來源既可以批讀也支援流讀。

Flink、Hive、Spark的流轉批架構:

圖片

Hudi流批同源架構:

圖片

1.2 COW和MOR的概念

Hudi支援COW(Copy On Write)和MOR(Merge On Read)兩種型別:

(1)COW寫時拷貝

每次更新的資料都會拷貝一份新的資料版本出來,使用者通過最新或者指定version的可以進行資料查詢。缺點是寫入的時候往往會有寫記憶體放大的情況,優點是查詢不需要合併,直接讀取效率相對比較高。JDK中的CopyOnWriteArrayList/CopyOnWriteArraySet 容器正是採用了 COW 思想。

COW表的資料組織格式如下:

圖片

(2)MOR讀時合併:

每次更新或者插入新的資料時,並寫入parquet檔案,而是寫入Avro格式的log檔案中,資料按照FileGroup進行分組,每個FileGroup由base檔案(parquet檔案)和若干log檔案組成,每個FileGroup有單獨的FileGroupID;在讀取的時候會在記憶體中將base檔案和log檔案進行合併,進而返回查詢的資料。缺點是合併需要花費額外的合併時間,查詢的效率受到影響;優點是寫入的時候效率相較於COW快很多,一般用於要求資料快速寫入的場景。

MOR資料組織格式如下:

圖片

1.3 Hudi的小檔案治理方案

Hudi表會針對COW和MOR表制定不同的檔案合併方案,分別對應Clustering和Compaction。

Clustering顧名思義,就是將COW表中多個FileGroup下的parquet根據指定的資料大小重新編排合併為新的且檔案體積更大的檔案塊。如下圖所示:

圖片

Compaction即base parquet檔案與相同FileGroup下的其餘log檔案進行合併,生成最新版本的base檔案。如下圖所示:

圖片

1.4 周邊引擎查詢Hudi的原理

當前主流的OLAP引擎等都是從HMS中獲取Hudi的分割區後設資料資訊,從InputFormat屬性中判斷需要啟動HiveCatalog還是HudiCatalog,然後生成查詢計劃最終執行。當前StarRocks、Presto等引擎都支援以外表的形式對Hudi表進行查詢。

圖片

1.5 Procedure介紹

Hudi 支援多種Procedure,即過程處理程式,使用者可以通過這些Procedure方便快速的處理Hudi表的相關邏輯,比如Compaction、Clustering、Clean等相關處理邏輯,不需要進行編碼,直接通過sparksql的語句來執行。

1.6 專案架構

1. 按時效性要求進行分類

秒級延遲:

圖片

分鐘級延遲:

圖片

當前Hudi主要還是應用在準實時場景

上游從Kafka以append模式接入ods的cow表,下游部分dw層業務根據流量大小選擇不同型別的索引表,比如bucket index的mor表,在資料去重後進行dw構建,從而提供統一資料服務層給下游的實時和離線的業務,同時ods層和dw層統一以insert overwrite的方式進行分割區級別的容災保障,Timeline上寫入一個replacecommit的instant,不會引發下游流量驟增,如下圖所示:

圖片

1.7 線上達成能力

實時場景:

支援1億條/min量級準實時寫入;流讀延遲穩定在分鐘級

離線場景:

支援千億級別資料單批次離線寫入;查詢效能與查詢Hive持平(部分線上任務較查詢Hive提高20%以上)

小檔案治理:

95%以上的合併任務單次執行控制在10min內完成

二、元件能力優化

2.1 元件版本

當前線上所有Hudi的版本已從0.12 升級到 0.14,主要考慮到0.14版本的元件能力更加完備,且與社群前沿動態保持一致。

2.2 流計算場景

1. 限流

資料積壓嚴重的情況下,預設情況會消費所有未消費的commits,往往因消費的commits數目過大,導致任務頻繁OOM,影響任務穩定性;優化後每次使用者可以攝取指定數目的commits,很大程度上避免任務OOM,提高了任務穩定性。

圖片

2. 外接clean運算元

避免單並行度的clean運算元最終階段影響資料實時寫入的效能;將clean單獨剝離到

compaction/clustering執行。這樣的好處是單個clean運算元,不會因為其生成clean計劃和執行導致區域性某些Taskmanager出現熱點的問題,極大程度提升了實時任務穩定性。

圖片

3. JM記憶體優化

部分大流量場景中,儘管已經對Hudi進行了最大程度的調優,但是JM的記憶體仍然在較高水位波動,還是會間隔性出現記憶體溢位影響穩定性。這種情況下我們嘗試對 state.backend.fs.memory-threshold 引數進行調整;從預設的20KB調整到1KB,JM記憶體顯著下降;同時執行至今state相關資料未產生小檔案影響。

圖片

2.3 批計算場景

1. Bucket index下的BulkInsert優化

0.14版本後支援了bucket表的bulkinsert,實際使用過程中發現分割區數很大的情況下,寫入延遲耗時與計算資源消耗較高;分析後主要是開啟的控制程式碼數較多,不斷CPU IO 頻繁切換影響寫入效能。

因此在hudi核心進行了優化,主要是基於partition path和bucket id組合進行預排序,並提前關閉空閒寫入控制程式碼,進而優化cpu資源使用率。

這樣原先50分鐘的任務能降低到30分鐘以內,資料寫入效能提高約30% ~ 40%。

優化前:

圖片

優化後:

圖片

2. 查詢優化

0.14版本中,部分情況下分割區裁剪會失效,從而導致條件查詢往往會掃描不相關的分割區,在分割區數龐大的情況下,會導致driver OOM,對此問題進行了修復,提高了查詢任務的速度和穩定性。

eg:select * from `hudi_test`.`tmp_hudi_test` where day='2023-11-20' and hour=23; 

(其中tmp_hudi_test是一張按日期和小時二級分割區的表)

修復前:

圖片

修復後:

圖片

優化後不僅包括減少分割區的掃描數目,也減少了一些無效檔案RPC的stage。

3. 多種OLAP引擎支援

此外,為了提高MOR表管理的效率,我們禁止了RO/RT表的生成;同時修復了原表的後設資料不能正常同步到HMS的缺陷(這種情況下,OLAP引擎例如Presto、StarRocks查詢原表資料預設僅支援對RO/RT表的查詢,原表查詢為空結果)。

圖片

2.4 小檔案合併

1. 序列化問題修復

0.14版本Hudi在檔案合併場景中,Compaction的效能相較0.12版本有30%左右的資源優化,比如:原先0.12需要6G資源才能正常啟動單個executor的場景下,0.14版本 4G就可以啟動並穩定執行任務;但是clustering存在因TypedProperties重複序列化導致的效能缺陷。完善後,clustering的效能得到30%以上的提升。

可以從executor的修復前後的火焰圖進行比對。

修復前:

圖片

修復後:

圖片

2. 分批compaction/clustering

compaction/clustering預設不支援按commits數分批次執行,為了更好的相容平臺排程能力,對compaction/clustering相關procedure進行了改進,支援按批次執行。

同時對其他部分procedure也進行了優化,比如copy_to_table支援了列裁剪拷貝、delete_procedures支援了批次執行等,降低sparksql的執行時間。

3. clean優化

Hudi0.14 在多分割區表的場景下clean的時候很容易OOM,主要是因為構建HoodieTableFileSystemView的時候需要頻繁存取TimelineServer,因產生大量分割區資訊請求物件導致記憶體溢位。具體情況如下:

圖片

對此我們對partition request Job做了相關優化,將多個task分為多個batch來執行,降低對TimelineSever的記憶體壓力,同時增加了請求前的快取判斷,如果已經快取的將不會發起請求。

改造後如下:

圖片

此外實際情況下還可以在FileSystemViewManager構建過程中將 remoteview 和 secondview 的順序互調,絕大部分場景下也能避免clean oom的問題,直接優先從secondview中獲取分割區資訊即可。

2.5 生命週期管理

當前計算平臺支援使用者表級別生命週期設定,為了提高刪除的效率,我們設計實現了直接從目錄對資料進行刪除的方案,這樣的收益有:

  1. 降低了後設資料互動時間,執行時間快;

  2. 無須加鎖、無須停止任務;

  3. 不會影響後續compaction/clustering 相關任務執行(比如執行合併的時候不會報檔案不存在等異常)。

刪除前會對compaction/clustering等instants的後設資料資訊進行掃描,經過合法性判斷後區分使用者需要刪除的目錄是否存在其中,如果有就儲存;否則直接刪除。流程如下:

圖片

三、總結

我們分別在流批場景、小檔案治理、生命週期管理等方向做了相關優化,上線後的收益主要體現這四個方向:

  1. 部分實時鏈路可以進行合併,降低了計算和儲存資源成本;

  2. 基於watermark有效識別分割區寫入的完成度,接入湖倉的後續離線任務平均SLA提前時間不低於60分鐘;

  3. 部分流轉批後的任務上線後執行時間減少約40%(比如原先執行需要150秒的任務可以縮短到100秒左右完成 ;

  4. 離線增量更新場景,部分任務相較於原先Hive任務可以下降30%以上的計算資源。

同時跟進使用者實際使用情況,發現了一些有待優化的問題:

  1. Hudi生成檔案的體積相較於原先Hive,體積偏大(平均有1.3 ~ 1.4的比例);

  2. 流讀的指標不夠準確;

  3. Hive—>Hudi遷移需要有一定的學習成本;

針對上述問題,我們也做了如下後續計劃:

  1. 對hoodie parquet索引檔案進行精簡優化,此外業務上對主鍵的重新設計也會直接影響到檔案體積大小;

  2. 部分流讀的指標不準,我們已經完成初步的指標修復,後續需要補充更多實時的任務指標來提高使用者體驗;

  3. 完善Hudi遷移流程,提供更快更簡潔的遷移工具,此外也會向更多的業務推廣Hudi元件,進一步挖掘Hudi元件的潛在使用價值。