在醫療場景下,涉及到的業務庫有幾十個,可能有上萬張表要做實時入湖,其中還有某些庫的表結構修改操作是通過業務人員在網頁手工實現,自由度較高,導致整體上存在非常多的新增列,刪除列,改列名的情況。由於Apache Hudi 0.9.0 版本到 0.11.0 版本之間只支援有限的schema變更,即新增列到尾部的情況,且使用者對資料質量要求較高,導致了非常高的維護成本。每次刪除列和改列名都需要重新匯入,這種情況極不利於長期發展,所以需要一種能夠以較低成本支援完整schema演變的方案。
在 https://hudi.apache.org/docs/schema_evolution 中提到:schema演化允許使用者輕鬆更改 Apache Hudi 表的當前 Schema,以適應隨時間變化的資料。從 0.11.0 版本開始,已新增 Spark SQL(Spark 3.1.x、3.2.1 及更高版本)對 Schema 演化的 DDL 支援並處於試驗階段。
為此我們針對該功能進行了相關測試和調研工作。
回顧Apache Hudi 對schema演變的支援隨著版本迭代的變化如下:
版本 | Schema演變支援 | 多引擎查詢 |
---|---|---|
*<0.9 | 無 | 無 |
0.9<* | 在最後的根級別新增一個新的可為空列 | 是(全) |
向內部結構新增一個新的可為空列(最後) | 是(全) | |
新增具有預設值的新複雜型別欄位(地圖和陣列) | 是(全) | |
新增自定義可為空的 Hudi 元列,例如_hoodie_meta_col |
是(全) | |
為根級別的欄位改變資料型別從 int 到long |
是(全) | |
將巢狀欄位資料型別從int 到long |
是(全) | |
將複雜型別(對映或陣列的值)資料型別從int 到long |
是(全) | |
0.11<* | 相比之前版本新增:改列名 | spark以外的引擎不支援 |
相比之前版本新增:刪除列 | spark以外的引擎不支援 | |
相比之前版本新增:移動列 | spark以外的引擎不支援 |
Apache Hudi 0.11.0版本完整Schema演變支援的型別修改如下:
Source\Target | long | float | double | string | decimal | date | int |
---|---|---|---|---|---|---|---|
int | Y | Y | Y | Y | Y | N | Y |
long | Y | N | Y | Y | Y | N | N |
float | N | Y | Y | Y | Y | N | N |
double | N | N | Y | Y | Y | N | N |
decimal | N | N | N | Y | Y | N | N |
string | N | N | N | Y | Y | Y | N |
date | N | N | N | Y | N | Y | N |
實踐中0.9.0版本的新增列未發現問題,已在正式環境使用。每次寫入前捕獲是否存在新增列刪除列的情況,新增列的情況及時補空資料和struct,新增列的資料及時寫入Hudi中;刪除列則資料補空,struct不變,刪除列仍寫入Hudi中;每天需要重導資料處理刪除列和修改列的情況,有變化的表在Hive中的後設資料也以天為單位重新註冊。
0.11開始的方式,按照官網的步驟:
進入spark-sql
# Spark SQL for spark 3.1.x
spark-sql --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.11.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
# Spark SQL for spark 3.2.1 and above
spark-sql --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.11.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
設定引數,刪列:
set hoodie.schema.on.read.enable=true;
---建立表---
create table test_schema_change (
id string,
f1 string,
f2 string,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
);
---1.新增列---
alter table test_schema_change add columns (f3 string);
---2.刪除列---
alter table test_schema_change drop column f2;
---3.改列名---
alter table test_schema_change rename column f1 to f1_new;
由於spark-sql的支援只在spark3.1之後支援,尋找並嘗試了 BaseHoodieWriteClient.java 中存在名為 addColumn renameColumn deleteColumns 的幾個方法,通過主動呼叫這些方法,也能達到schema完整變更的目的。使用這種方式需要將DDL的sql解析為對應的方法和引數,另外由於該方式測試和使用的例子還比較少,存在一些細節問題需要解決。
val hsec = new HoodieSparkEngineContext(spark.sparkContext);
val hoodieCfg = HoodieWriteConfig.newBuilder().forTable(tableName).withEmbeddedTimelineServerEnabled(true).withPath(basePath).build()
val client = new SparkRDDWriteClient(hsec, hoodieCfg)
//增加列
client.addColumn("f3",Schema.create(Schema.Type.STRING))
//刪除列
client.deleteColumns("f1")
//改列名
client.renameColumn("f2","f2_c1")
其中核心的類為 org.apache.hudi.internal.schema.InternalSchema ,出自HUDI-2429,通過記錄包括順序的完整列資訊,並且每次變更都儲存歷史記錄,而非之前的只關注最新 org.apache.avro.Schema。
spark-sql的方式只支援Spark3.1、Spark3.2,分析如下:
此處以BaseHoodieWriteClient.java 中具體修改方法的實現邏輯,分析完整schema演變在寫入過程的支援。
注意:在一次資料寫入操作完成後的commit階段,會根據條件判斷,是否儲存 InternalSchema,關鍵條件為引數 hoodie.schema.on.read.enable
主動修改列的操作前,需要先存在歷史schema,否則會丟擲異常 "cannot find schema for current table: ${basepath}",因為metadata裡不存在SerDeHelper.LATEST_SCHEMA(latest_schema)
如圖所示,每次提交生成一份歷史的schema,位於${basePath}/.hoodie/.schema目錄下。
其中20220824202636627.schemacommit 內容:
{
"schemas": [
{
"max_column_id": 8,
"version_id": 20220824202636627,
"type": "record",
"fields": [
...
{
"id": 5,
"name": "id",
"optional": true,
"type": "string"
},
{
"id": 6,
"name": "f1",
"optional": true,
"type": "string"
},
{
"id": 7,
"name": "f2",
"optional": true,
"type": "string"
},
{
"id": 8,
"name": "ts",
"optional": true,
"type": "long"
}
]
}
]
}
期間新增了列f3後
20220824203337656.schemacommit 內容為:
{
"schemas": [
{
"max_column_id": 9,
"version_id": 20220824202940558,
"type": "record",
"fields": [
...
{
"id": 5,
"name": "id",
"optional": true,
"type": "string"
},
{
"id": 6,
"name": "f1",
"optional": true,
"type": "string"
},
{
"id": 7,
"name": "f2",
"optional": true,
"type": "string"
},
{
"id": 8,
"name": "ts",
"optional": true,
"type": "long"
},
{
"id": 9,
"name": "f3",
"optional": true,
"type": "string"
}
]
},
{
"max_column_id": 8,
"version_id": 20220824202636627,
"type": "record",
"fields": [
...
{
"id": 5,
"name": "id",
"optional": true,
"type": "string"
},
{
"id": 6,
"name": "f1",
"optional": true,
"type": "string"
},
{
"id": 7,
"name": "f2",
"optional": true,
"type": "string"
},
{
"id": 8,
"name": "ts",
"optional": true,
"type": "long"
}
]
}
]
}
其中max_column_id 為列id最大值,version_id 為版本號,也為instantTime。
存在 latest_schema 的情況如下所示:
主動呼叫 BaseHoodieWriteClient.java 類中相應方法的方式下,由於儲存歷史schema的邏輯上,a.開啟該功能引數(hoodie.schema.on.read.enable) && b.存在歷史schema的才能儲存歷史schema,在使用該功能之前或低於0.11版本的寫入升級到該版本,已經正在更新的hudi表,無法使用該功能。建議把條件a為真,b為假的情況,根據當前schema直接生成歷史schema
該處細節問題已經在HUDI-4276修復,0.12.0版本及以後不會有這個問題
hoodie.datasource.write.reconcile.schema 預設為false,如果要達到上述目的,改為true即可
大體流程如下:
1.總體流程為某個查詢進入dataSource中,選擇具體的relacation,獲取查詢schema,獲取scan
2.在scan中獲取每個基礎檔案或紀錄檔的資料塊對應的資料schema
3.在scan中獲取資料schema後與查詢schema進行merge,通過merge的schema來讀取具體資料
上圖中流程 **a **大體流程如下:
由於基礎檔案的命名方式和組織形式,基礎檔案的scan過程在HoodieParquetFileFormat中可以直接通過檔名獲取InstantTime:
在用於讀取和寫入hudi表DefaultSource中,createRelation方法按照引數建立對應的BaseRelation擴充套件子類
HoodieBaseRelation#buildScan中呼叫 composeRDD 方法,該方法分別在子類BaseFileOnlyRelation,MergeOnReadSnapshotRelation,MergeOnReadIncrementalRelation 中實現,
以MergeOnReadSnapshotRelation 即mor表的快照讀為例,在composeRDD 方法中呼叫父類別createBaseFileReader的方法,其中val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader,以SparkAdapterSupport的createHoodieParquetFileFormat建立ParquetFileFormat,
SparkAdapterSupport的三個子類分別為Spark2Adapter,Spark3_1Adapter和Spark3_2Adapter,以Spark3_1Adapter實現的方法為例
建立Spark31HoodieParquetFileFormat,其中buildReaderWithPartitionValues方法中,會通過FSUtils.getCommitTime獲取InstantTime
log檔案的檔名中的時間戳與提交 instantTime不一致,一個log檔案對應多次時間軸 instantTime 提交。
紀錄檔檔案的scan在AbstractHoodieLogRecordReader.java的的通過每個HoodieDataBlock的header中的 INSTANT_TIME 獲取對應的 instantTime
以MergeOnReadSnapshotRelation為例,在composeRDD中建立HoodieMergeOnReadRDD
在HoodieMergeOnReadRDD的compute方法中使用的LogFileIterator類及其子類中使用HoodieMergeOnReadRDD的scanLog方法
scanLog中建立HoodieMergedLogRecordScanner,建立時執行performScan() -> 其父類別AbstractHoodieLogRecordReader的scan(),
scan() -> scanInternal() -> processQueuedBlocksForInstant() 迴圈獲取雙端佇列的logBlocks -> processDataBlock() -> getMergedSchema()
在getMergedSchema方法中通過HoodieDataBlock的getLogBlockHeader().get(INSTANT_TIME)獲取InstantTime
根據InstantTime獲取時間軸提交檔案
如果能夠獲取,直接取其中extraMetadata中的latest_schema內容作為資料schema
如果不能獲取,在獲取最新的${basePath}/.hoodie/.schema/下的具體檔案後,通過檔案內容搜尋具體 InternalSchema找到最新的history
如果有InstantTime對應的versino_id,直接獲取
如果沒有InstantTime對應的versino_id,說明那次寫入無變化,從那次寫入前的最新一次獲取
輸入資料schema和查詢schema,和幾個布林屬性,獲得InternalSchemaMerger物件
遞迴呼叫mergeType方法處理查詢schema,首先進入RECORD,遍歷每個列,mergeType方法處理
略過複雜型別
基本型別中會進入buildPrimitiveType方法
根據輸入的id獲取資料schena的Type,如果沒有,就返回輸入的Type
將返回的Type加入名為 newTypes的Type列表,把newTypes和查詢schema的欄位列表的輸入buildRecordType方法
遍歷查詢schema的列,並用id和name獲取資料schema的列
如果id和name都一致,為改列型別,使用資料schema的型別
如果id相同,name不同,改列名,使用資料schema的名字
如果id不同,name相同,先刪後加,加字尾保證讀不到檔案內容
如果id不同,name不同,後來新增列
組裝返回merge後的schema
如下所示:
其中id為唯一標誌性,
id=0的query裡改名為f1v1,merge後為f1,
id=1的query裡刪除,merge裡也沒有,
id=2的query裡為long型,files裡為int型,merge裡為long型
id=3的query裡新增,返回query的欄位
id=4的query裡name為f1,對應file裡的name為f1的id為0,所以merge裡id為4,name為 ("f1"+"suffix")
測試的Spark版本 > spark3.1且 hoodie.schema.on.read.enable=true
完全支援
否則測試結果如下:
操作型別 | 是否支援 | 原因 |
---|---|---|
新增列 | 是 | 按列名查詢,沒有的列返回null |
刪除列 | 是 | 按列名查詢,原有的列跳過 |
改列名 | 否 | 按列名查詢不到old_field值,能查詢到new_field的值 |
Hive查詢MOR的rt表有些問題,此處不再細述,此處修改列操作後都同步Hive後設資料
操作型別 | 是否支援 | 原因 |
---|---|---|
新增列 | 是 | 按列名查詢基礎檔案,檔案沒有的列返回null |
刪除列 | 是 | 按列名查詢基礎檔案,檔案原有列跳過 |
改列名 | 否 | 按列名查詢不到old_field值,能查詢到new_field的值 |
由於hive的查詢依據的是hive metastore中的唯一版本的後設資料,資料修改列後還需要同步到hive後才能查詢到表的變更,該過程唯讀取時間軸中最新提交的schema,且查詢使用的類 org.apache.hudi.hadoop.HoodieParquetInputFormat 中並不存在針對schema完整變更做出的改動,所以測試結果與 spark2.* 或hoodie.schema.on.read.enable=false
的情況相當。
重新命名列的情況下,查詢不到改名後的列名對應的資料。需要所有檔案組都在改列名後產生新的基礎檔案後,資料才準確。
由於Presto同樣使用hive的後設資料,330的presto遇到的問題和hive遇到的問題一致,查詢rt表仍為查詢ro表
trino-360 和 presto275 使用某個patch支援查詢rt表後,查詢ro表問題如下:
操作型別 | 是否支援 | 原因 |
---|---|---|
新增列 | 否 | 按順序查詢基礎檔案,導致串列,新增列在ts列之前可能丟擲異常 |
刪除列 | 否 | 按順序查詢基礎檔案,導致串列,因為ts型別很可能丟擲異常 |
改列名 | 是 | 按順序查詢基礎檔案,名字不同,順序相同 |
出現串列異常,除非所有檔案組的最新基礎檔案都是修改列操作之後產生的,才能準確。
原因大致為:這些版本中查詢hudi表,讀取parquet檔案中資料時按順序和查詢schema對應,而非使用parquet檔案自身攜帶的schema去對應
查詢rt表如下:
操作型別 | 是否支援 | 原因 |
---|---|---|
新增列 | 是 | 按列名查詢基礎檔案和紀錄檔檔案,檔案沒有的列返回null |
刪除列 | 是 | 按列名查詢基礎檔案和紀錄檔檔案,檔案原有列跳過 |
改列名 | 否 | 按列名查詢不到old_field值,能查詢到new_field的值 |
可見查詢rt表仍按parquet檔案的schema對應,所以沒有上述串列問題,等效於 spark2.* 或hoodie.schema.on.read.enable=false
的情況
目前該方案在Spark引擎上支援完整schema演變, 降低生產環境下上游欄位變更的處理成本。但該方案還比較粗糙,後續有以下方面可以繼續改進
PS:如果您覺得閱讀本文對您有幫助,請點一下「推薦」按鈕,您的「推薦」,將會是我不竭的動力!
作者:leesf 掌控之中,才會成功;掌控之外,註定失敗。
出處:http://www.cnblogs.com/leesf456/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連線,否則保留追究法律責任的權利。
如果覺得本文對您有幫助,您可以請我喝杯咖啡!