文中部分程式碼對應 0.14.0 版本
初始的需求是Uber公司會有很多記錄級別的更新場景,Hudi 在Uber 內部主要的一個場景,就是乘客打車下單和司機接單的匹配,乘客和司機分別是兩條資料流,通過 Hudi 的 Upsert 能力和增量讀取功能,可以分鐘級地將這兩條資料流進行拼接,得到乘客-司機的匹配資料。
為了提升更新的時效性,因此提出了一套新的框架作為近實時的增量的解決方案
從名字Hadoop Upsert and Incremental
也可以看出hudi的主要功能是upsert 和 incremental 的能力,架在Hadoop之上。
https://hudi.apache.org/cn/docs/indexing
主要是通過索引技術來實現高效的upsert和delete。通過索引可以將一條記錄的Hoodie key (record key)對映到一個檔案id,然後根據表的型別,以及寫入資料的型別,來決定更新和刪除輸入的插入方式。
索引型別
索引的型別還分為global 和 非 global 兩種,BloomFilter Index和 Simple Index這兩種有global的選項,hbase天然就是global的選項,global index會保障全域性分割區下鍵的唯一性,代價會更高。
Odps/MaxCompute也支援更新刪除
https://help.aliyun.com/document_detail/205825.html
也是用過base file + delta log的思路來實現
Hive3.0 也支援更新刪除和ACID語意
https://www.adaltas.com/en/2019/07/25/hive-3-features-tips-tricks/
https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions
差別在於hudi是支援了資料表的upsert,也就是能在寫入時就保證資料主鍵的唯一性,而odps 和 hive應該只是支援了通過update 和 delete dml語句來更新資料,覆蓋場景不同。後者應該主要只是在資料訂正的場景,作為入湖的選型還是需要天然支援upsert才行。
我認為事務支援是hudi中最核心的部分,因為資料的更新刪除都強依賴事務的能力,傳統數倉中只提供insert語意並且檔案只能追加,對事務保障的需求會弱很多,最多就是讀到了不完整的資料(寫入分割區資料後還發生append)。
但是當需要支援update和delete語意時,對事務的保障的需求就會強很多,所以可以看到hive和odps中想要開啟表的更新和刪除能力,首先需要開啟表的事務屬性。
hudi中事務的實現
**MVCC **通過mvcc機制實現多writer和reader之間的快照隔離
OCC 樂觀並行控制
預設hudi是認為單writer寫入的,這種情況下吞吐是最大的。如果有多writer,那麼需要開啟多writer的並行控制
hoodie.write.concurrency.mode=optimistic_concurrency_control
# 指定鎖的實現 預設是基於filesystem 的鎖機制(要求filesystem能提供原子性的建立和刪除保障)
hoodie.write.lock.provider=<lock-provider-classname>
支援檔案粒度的樂觀並行控制,在寫入完成commit時,如果是開啟了occ,那麼會先獲取鎖,然後再進行commit。看起來這個鎖是全域性粒度的一把鎖,以filesystem lock為例
commit 流程
protected void autoCommit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<O> result) {
final Option<HoodieInstant> inflightInstant = Option.of(new HoodieInstant(State.INFLIGHT,
getCommitActionType(), instantTime));
// 開始事務,如果是occ並行模型,會獲取鎖
this.txnManager.beginTransaction(inflightInstant,
lastCompletedTxn.isPresent() ? Option.of(lastCompletedTxn.get().getLeft()) : Option.empty());
try {
setCommitMetadata(result);
// reload active timeline so as to get all updates after current transaction have started. hence setting last arg to true.
// 嘗試解衝突,衝突判定的策略是可插拔的,預設是變更的檔案粒度檢視是否有交集. 目前衝突的檔案更改是無法處理的,會終止commit請求
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
result.getCommitMetadata(), config, this.txnManager.getLastCompletedTransactionOwner(), true, pendingInflightAndRequestedInstants);
commit(extraMetadata, result);
} finally {
this.txnManager.endTransaction(inflightInstant);
}
}
鎖獲取流程
@Override
public boolean tryLock(long time, TimeUnit unit) {
try {
synchronized (LOCK_FILE_NAME) {
// Check whether lock is already expired, if so try to delete lock file
// 先檢查lock file 是否存在,預設路徑是 base/.hoodie/lock 也就是所有的commit操作都會操作這個檔案
if (fs.exists(this.lockFile)) {
if (checkIfExpired()) {
fs.delete(this.lockFile, true);
LOG.warn("Delete expired lock file: " + this.lockFile);
} else {
reloadCurrentOwnerLockInfo();
return false;
}
}
// 如果檔案不存在,則獲取鎖,建立檔案
acquireLock();
return fs.exists(this.lockFile);
}
} catch (IOException | HoodieIOException e) {
// 建立時可能會發生失敗,則返回false獲取鎖失敗
LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
return false;
}
}
如果兩個寫入請求修改的檔案沒有重疊,在resolveConflict階段直接通過,如果有重疊,那麼後提交的寫入會失敗並回滾。
Base File是儲存Hudi資料集的主體檔案,以Parquet等列式格式儲存。格式為
<fileId>_<writeToken>_<instantTime>.parquet
Log File是在MOR表中用於儲存變化資料的檔案,也常被稱作Delta Log,Log File不會獨立存在,一定會從屬於某個Parquet格式的Base File,一個Base File和它從屬的若干Log File所構成的就是一個File Slice。
.<fileId>_<baseCommitTime>.log.<fileVersion>_<writeToken>
File Slice,在MOR表裡,由一個Base File和若干從屬於它的Log File組成的檔案集合被稱為一個File Slice。File Slice是針對MOR表的特定概念,對於COW表來說,由於它不生成Log File,所以File Silce只包含Base File,或者說每一個Base File就是一個獨立的File Silce。
FileId相同的檔案屬於同一個File Group。同一File Group下往往有多個不同版本(instantTime)的Base File(針對COW表)或Base File + Log File的組合(針對MOR表),當File Group內最新的Base File迭代到足夠大( >100MB)時,Hudi就不會在當前File Group上繼續追加資料了,而是去建立新的File Group。
這裡面可以看到根據大小上下限來決定是否建立新的File Group在hudi中叫自適應的file sizing。這裡其實就是在partition的粒度下建立了更小粒度的group. 類似於Snowflake中的micro partition技術。這個對於行級別的更新是很友好的,不管是cow還是mor表都減少了更新帶來的重寫資料的範圍。
多種查詢型別
Hudi預設支援了寫入表的後設資料管理,metadata 也是一張MOR的hoodie表. 初始的需求是為了避免頻繁的list file(分散式檔案系統中這一操作通常很重)。Metadata是以HFile的格式儲存(Hbase儲存格式),提供高效的kv點查效率
Metadata 相關功能的設定org.apache.hudi.common.config.HoodieMetadataConfig
提供了哪些後設資料?
hoodie.metadata.index.bloom.filter.enable
儲存資料檔案的bloom filter indexhoodie.metadata.index.column.stats.enable
儲存資料檔案的column 的range 用於裁剪優化flink data skipping支援: https://github.com/apache/hudi/pull/6026
Catalog 支援 基於dfs 或者 hive metastore 來構建catalog 來管理所有在hudi上的表的後設資料
CREATE CATALOG hoodie_catalog
WITH (
'type'='hudi',
'catalog.path' = '${catalog default root path}',
'hive.conf.dir' = '${directory where hive-site.xml is located}',
'mode'='hms' -- supports 'dfs' mode that uses the DFS backend for table DDLs persistence
);
schema evolution, clustering,clean, file sizing..
https://hudi.apache.org/cn/docs/write_operations
外掛支援多種寫入模式, 參見org.apache.hudi.table.HoodieTableSink#getSinkRuntimeProvider
。常見的有
https://hudi.apache.org/cn/docs/hoodie_deltastreamer
BULK_INSERT
, bulk insert 模式通常是用來批次匯入資料,
每次寫入資料RowData時,會同時更新bloom filter索引(將record key 新增到bloom filter 中). 在一個parquet檔案寫完成之後,會將構建的bloom filter資訊序列化成字串, 以及此檔案的key range,序列化後儲存到file footer中(在沒開啟bloom filter索引時也會做這一步).
public Map<String, String> finalizeMetadata() {
HashMap<String, String> extraMetadata = new HashMap<>();
extraMetadata.put(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, bloomFilter.serializeToString());
if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) {
extraMetadata.put(HOODIE_BLOOM_FILTER_TYPE_CODE, bloomFilter.getBloomFilterTypeCode().name());
}
if (minRecordKey != null && maxRecordKey != null) {
extraMetadata.put(HOODIE_MIN_RECORD_KEY_FOOTER, minRecordKey.toString());
extraMetadata.put(HOODIE_MAX_RECORD_KEY_FOOTER, maxRecordKey.toString());
}
return extraMetadata;
}
Append Mode
: 僅只有Insert的資料
Upsert
:
BootstrapOperator
用於基於已經存在的hoodie錶的歷史資料集,構建初始的index索引(可選)通過引數index.bootstrap.enabled
開啟,預設為false。載入過程會可能會比較慢,開啟的情況下需要等到所有task都載入完成才能處理資料。這個載入需要獲取所有分割區的 索引,載入到state中. 這個理論上是需要讀取metadata列 _hoodie_record_key
和 _hoodie_partition_path
然後構建出IndexRecord,所以會很慢。BucketAssignFunction
計算資料應該落到哪個bucket(file group)去, 感覺bucket這個詞和bucket index有點衝突,這裡是兩個概念,這裡主要還是劃分資料所屬哪個file,這一步就會用到前面構建的索引,所以預設情況下flink的索引是基於state的// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
HoodieRecordGlobalLocation oldLoc = indexState.value();
// change records 表示會更改資料的寫入型別如update,delete
if (isChangingRecords && oldLoc != null) {
// Set up the instant time as "U" to mark the bucket as an update bucket.
// 打標之後如果partition 發生變化了,例如partition 欄位發生了變化 ? 狀態中儲存的就是這個資料應該存放的location
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
if (globalIndex) {
// if partition path changes, emit a delete record for old partition path,
// then update the index state using location with new partition path.
// 對於全域性索引,需要先刪除老的分割區的資料,非全域性索引不做跨分割區的改動
HoodieRecord<?> deleteRecord = new HoodieAvroRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
deleteRecord.unseal();
deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.seal();
out.collect((O) deleteRecord);
}
location = getNewRecordLocation(partitionPath);
} else {
location = oldLoc.toLocal("U");
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
location = getNewRecordLocation(partitionPath);
}
可以看到在BucketAssigner這一步就已經確定了record 已經落到哪個fileid中(也就是打標的過程),所以預設就走的是基於state的索引。 在這裡org.apache.hudi.table.action.commit.FlinkWriteHelper#write
區別於org.apache.hudi.table.action.commit.BaseWriteHelper#write
。好處就是不用像BloomFilter 索引去讀取檔案key 以及並且沒有假陽的問題,壞處就是需要在寫入端通過state來維護索引。除了預設基於State索引的方式, Flink 也支援BucketIndex。
總體感覺,索引的實現比較割裂,交由各個引擎的實現端來完成。而且流式寫入依賴內部狀態索引可能穩定性的問題。
總的來看,hudi的核心價值有
端到端資料延遲降低
在傳統基於 Hive 的 T + 1 更新方案中,只能實現天級別的資料新鮮度,取決於partition的粒度。因為在傳統離線數倉裡面只能按照partition粒度commit,因為無法將paritition做到特別細粒度,檔案管理的壓力會很大,最多可能到小時,30min,那麼下游排程就只能按這個粒度來排程計算。而hudi裡面基於事務就可以非常快速的commit,並提供commit 之後的增量語意,那麼就可以加速離線資料處理pipeline。
高效的Upsert
不用每次都去 overwrite 整張表或者整個 partition 去更新,而是能夠精確到檔案粒度的區域性更新來提升儲存和計算效率。
而這兩者都是以ACID事務作為保障。因此Hudi的名字取的很好,基本把他的核心功能都說出來了。
https://github.com/leesf/hudi-resources hudi resources
https://github.com/apache/hudi/tree/master/rfc hudi rfcs
https://www.liaojiayi.com/lake-hudi/ hudi 核心概念解讀
https://cwiki.apache.org/confluence/display/HUDI/RFC+-+29%3A+Hash+Index hash 索引設計
https://stackoverflow.com/questions/19128940/what-is-the-difference-between-partitioning-and-bucketing-a-table-in-hive bucket in hive
https://www.cnblogs.com/leesf456/p/16990811.html 一文聊透hudi 索引機制
https://github.com/apache/hudi/blob/master/rfc/rfc-45/rfc-45.md async metadata indexing rfc
https://mp.weixin.qq.com/s/Moehs1Ch3j7IVANJQ1mfNw Apache Hudi重磅RFC解讀之記錄級別全域性索引
https://blog.csdn.net/weixin_47482194/article/details/116357831 MOR表的檔案結構分析
https://juejin.cn/post/7160589518440153096 實時資料湖 Flink Hudi 實踐探索
https://segmentfault.com/a/1190000041471105 hudi Bucket index
https://mp.weixin.qq.com/s/n_Kd6FhWs4_QZN_gmAuPhw file layouts
https://mp.weixin.qq.com/s?__biz=MzIyMzQ0NjA0MQ== file sizing
https://mp.weixin.qq.com/s/Te2zaF6AoJuTxY8ILzxlQg Clustering
https://docs.snowflake.com/en/user-guide/tables-clustering-micropartitions snowflake micropartition
https://cloud.tencent.com/developer/article/1827930 17張圖帶你徹底理解Hudi Upsert原理
https://hudi.apache.org/cn/docs/concurrency_control/ 並行控制
https://www.infoq.cn/article/Pe9ejRJDrJsp5AIhjlE3 物件儲存
https://www.striim.com/blog/data-warehouse-vs-data-lake-vs-data-lakehouse-an-overview/ data lake vs data warehouse vs lake house
本文來自部落格園,作者:Aitozi,轉載請註明原文連結:https://www.cnblogs.com/Aitozi/p/17373573.html