詳解 Apache Hudi Schema Evolution(模式演進)

2022-07-24 18:02:23

Schema Evolution(模式演進)允許使用者輕鬆更改 Hudi 表的當前模式,以適應隨時間變化的資料。 從 0.11.0 版本開始,支援 Spark SQL(spark3.1.x 和 spark3.2.1)對 Schema 演進的 DDL 支援並且標誌為實驗性的。

場景

  • 可以新增、刪除、修改和移動列(包括巢狀列)
  • 分割區列不能演進
  • 不能對 Array 型別的巢狀列進行新增、刪除或操作

SparkSQL模式演進以及語法描述

使用模式演進之前,請先設定spark.sql.extensions,對於spark 3.2.x,需要設定spark.sql.catalog.spark_catalog

# Spark SQL for spark 3.1.x
spark-sql --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.1.2 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

# Spark SQL for spark 3.2.1
spark-sql --packages org.apache.hudi:hudi-spark3-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:3.2.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

啟動spark app後,請執行set schema.on.read.enable=true開啟模式演進

當前模式演進開啟後不能關閉

新增列

語法

-- add columns
ALTER TABLE Table name ADD COLUMNS(col_spec[, col_spec ...])

引數描述

引數 描述
tableName 表名
col_spec 列定義,由五個欄位組成,col_name, col_type, nullable, comment, col_position

col_name : 新列名,強制必須存在,如果在巢狀型別中新增子列,請指定子列的全路徑

範例

  • 在巢狀型別users struct<name: string, age int>中新增子列col1,設定欄位為users.col1
  • 在巢狀map型別member map<string, struct<n: string, a: int>>中新增子列col1, 設定欄位為member.value.col1

col_type : 新列的型別
nullable : 新列是否可為null,可為空,當前Hudi中並未使用
comment : 新列的註釋,可為空
col_position : 列新增的位置,值可為FIRST或者AFTER 某欄位

  • 如果設定為FIRST,那麼新加的列在表的第一列
  • 如果設定為AFTER 某欄位,將在某欄位後新增新列
  • 如果設定為空,只有當新的子列被新增到巢狀列時,才能使用 FIRST。 不要在頂級列中使用 FIRST。 AFTER 的使用沒有限制。

範例

alter table h0 add columns(ext0 string);
alter table h0 add columns(new_col int not null comment 'add new column' after col1);
alter table complex_table add columns(col_struct.col_name string comment 'add new column to a struct col' after col_from_col_struct);

修改列

語法

-- alter table ... alter column
ALTER TABLE Table name ALTER [COLUMN] col_old_name TYPE column_type [COMMENT] col_comment[FIRST|AFTER] column_name

引數描述

引數 描述
tableName 表名
col_old_name 待修改的列名
column_type 新的列型別
col_comment 列comment
column_name 列名,放置目標列的新位置。 例如,AFTER column_name 表示目標列放在 column_name 之後

範例

--- Changing the column type
ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint

--- Altering other attributes
ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
ALTER TABLE table1 ALTER COLUMN a.b.c FIRST
ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x
ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL

列型別變更矩陣表

源列型別\目標列型別 long float double string decimal date int
int Y Y Y Y Y N Y
long Y N Y Y Y N N
float N Y Y Y Y N N
double N N Y Y Y N N
decimal N N N Y Y N N
string N N N Y Y Y N
date N N N Y N Y N

刪除列

語法

-- alter table ... drop columns
ALTER TABLE tableName DROP COLUMN|COLUMNS cols

範例

ALTER TABLE table1 DROP COLUMN a.b.c
ALTER TABLE table1 DROP COLUMNS a.b.c, x, y

修改列名

語法

-- alter table ... rename column
ALTER TABLE tableName RENAME COLUMN old_columnName TO new_columnName

範例

ALTER TABLE table1 RENAME COLUMN a.b.c TO x

修改表屬性

語法

-- alter table ... set|unset
ALTER TABLE Table name SET|UNSET tblproperties

範例

ALTER TABLE table SET TBLPROPERTIES ('table_property' = 'property_value')
ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key')

修改表名

語法

-- alter table ... rename
ALTER TABLE tableName RENAME TO newTableName

範例

ALTER TABLE table1 RENAME TO table2

0.11.0之前的模式演進

模式演進是資料管理的一個非常重要的方面。 Hudi 支援開箱即用的常見模式演進場景,例如新增可為空的欄位或提升欄位的資料型別。 此外,演進後的模式可以跨引擎查詢,例如 Presto、Hive 和 Spark SQL。 下表總結了與不同 Hudi 表型別相容的Schema變更型別。

Schema變更 COW MOR 說明
在最後的根級別新增一個新的可為空列 Yes Yes Yes意味著具有演進模式的寫入成功並且寫入之後的讀取成功讀取整個資料集
向內部結構新增一個新的可為空列(最後) Yes Yes
新增具有預設值的新複雜型別欄位(map和array) Yes Yes
新增新的可為空列並更改欄位的順序 No No 如果使用演進模式的寫入僅更新了一些基本檔案而不是全部,則寫入成功但讀取失敗。 目前Hudi 不維護模式登入檔,其中包含跨基礎檔案的更改歷史記錄。 然而如果 upsert 觸及所有基本檔案,則讀取將成功
新增自定義可為空的 Hudi 元列,例如 _hoodie_meta_col Yes Yes
將根級別欄位的資料型別從 int 提升為 long Yes Yes 對於其他型別,Hudi 支援與Avro相同 Avro schema resolution
.
將巢狀欄位的資料型別從 int 提升為 long Yes Yes
對於複雜型別(map或array的值),將資料型別從 int 提升為 long Yes Yes
在最後的根級別新增一個新的不可為空的列 No No 對於Spark資料來源的MOR表,寫入成功但讀取失敗。 作為一種解決方法,您可以使該欄位為空
向內部結構新增一個新的不可為空的列(最後) No No
將巢狀欄位的資料型別從 long 更改為 int No No
將複雜型別的資料型別從 long 更改為 int(對映或陣列的值) No No

讓我們通過一個範例來演示 Hudi 中的模式演進支援。 在下面的範例中,我們將新增一個新的字串欄位並將欄位的資料型別從 int 更改為 long。

Welcome to
    ____              __
    / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
    /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
    /_/

    Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
    Type in expressions to have them evaluated.
    Type :help for more information.

scala> import org.apache.hudi.QuickstartUtils._
import org.apache.hudi.QuickstartUtils._

scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._

scala> import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.SaveMode._

scala> import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceReadOptions._

scala> import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DataSourceWriteOptions._

scala> import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.config.HoodieWriteConfig._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val tableName = "hudi_trips_cow"
    tableName: String = hudi_trips_cow
scala> val basePath = "file:///tmp/hudi_trips_cow"
    basePath: String = file:///tmp/hudi_trips_cow
scala> val schema = StructType( Array(
    | StructField("rowId", StringType,true),
    | StructField("partitionId", StringType,true),
    | StructField("preComb", LongType,true),
    | StructField("name", StringType,true),
    | StructField("versionId", StringType,true),
    | StructField("intToLong", IntegerType,true)
    | ))
    schema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,IntegerType,true))
    
scala> val data1 = Seq(Row("row_1", "part_0", 0L, "bob", "v_0", 0),
    |                Row("row_2", "part_0", 0L, "john", "v_0", 0),
    |                Row("row_3", "part_0", 0L, "tom", "v_0", 0))
    data1: Seq[org.apache.spark.sql.Row] = List([row_1,part_0,0,bob,v_0,0], [row_2,part_0,0,john,v_0,0], [row_3,part_0,0,tom,v_0,0])

scala> var dfFromData1 = spark.createDataFrame(data1, schema)
scala> dfFromData1.write.format("hudi").
    |   options(getQuickstartWriteConfigs).
    |   option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
    |   option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
    |   option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
    |   option("hoodie.index.type","SIMPLE").
    |   option(TABLE_NAME.key, tableName).
    |   mode(Overwrite).
    |   save(basePath)

scala> var tripsSnapshotDF1 = spark.read.format("hudi").load(basePath + "/*/*")
    tripsSnapshotDF1: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 9 more fields]

scala> tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot")

scala> spark.sql("desc hudi_trips_snapshot").show()
    +--------------------+---------+-------+
    |            col_name|data_type|comment|
    +--------------------+---------+-------+
    | _hoodie_commit_time|   string|   null|
    |_hoodie_commit_seqno|   string|   null|
    |  _hoodie_record_key|   string|   null|
    |_hoodie_partition...|   string|   null|
    |   _hoodie_file_name|   string|   null|
    |               rowId|   string|   null|
    |         partitionId|   string|   null|
    |             preComb|   bigint|   null|
    |                name|   string|   null|
    |           versionId|   string|   null|
    |           intToLong|      int|   null|
    +--------------------+---------+-------+
    
scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong from hudi_trips_snapshot").show()
    +-----+-----------+-------+----+---------+---------+
    |rowId|partitionId|preComb|name|versionId|intToLong|
    +-----+-----------+-------+----+---------+---------+
    |row_3|     part_0|      0| tom|      v_0|        0|
    |row_2|     part_0|      0|john|      v_0|        0|
    |row_1|     part_0|      0| bob|      v_0|        0|
    +-----+-----------+-------+----+---------+---------+

// In the new schema, we are going to add a String field and 
// change the datatype `intToLong` field from  int to long.
scala> val newSchema = StructType( Array(
    | StructField("rowId", StringType,true),
    | StructField("partitionId", StringType,true),
    | StructField("preComb", LongType,true),
    | StructField("name", StringType,true),
    | StructField("versionId", StringType,true),
    | StructField("intToLong", LongType,true),
    | StructField("newField", StringType,true)
    | ))
    newSchema: org.apache.spark.sql.types.StructType = StructType(StructField(rowId,StringType,true), StructField(partitionId,StringType,true), StructField(preComb,LongType,true), StructField(name,StringType,true), StructField(versionId,StringType,true), StructField(intToLong,LongType,true), StructField(newField,StringType,true))

scala> val data2 = Seq(Row("row_2", "part_0", 5L, "john", "v_3", 3L, "newField_1"),
    |                Row("row_5", "part_0", 5L, "maroon", "v_2", 2L, "newField_1"),
    |                Row("row_9", "part_0", 5L, "michael", "v_2", 2L, "newField_1"))
    data2: Seq[org.apache.spark.sql.Row] = List([row_2,part_0,5,john,v_3,3,newField_1], [row_5,part_0,5,maroon,v_2,2,newField_1], [row_9,part_0,5,michael,v_2,2,newField_1])

scala> var dfFromData2 = spark.createDataFrame(data2, newSchema)
scala> dfFromData2.write.format("hudi").
    |   options(getQuickstartWriteConfigs).
    |   option(PRECOMBINE_FIELD_OPT_KEY.key, "preComb").
    |   option(RECORDKEY_FIELD_OPT_KEY.key, "rowId").
    |   option(PARTITIONPATH_FIELD_OPT_KEY.key, "partitionId").
    |   option("hoodie.index.type","SIMPLE").
    |   option(TABLE_NAME.key, tableName).
    |   mode(Append).
    |   save(basePath)

scala> var tripsSnapshotDF2 = spark.read.format("hudi").load(basePath + "/*/*")
    tripsSnapshotDF2: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 10 more fields]

scala> tripsSnapshotDF2.createOrReplaceTempView("hudi_trips_snapshot")

scala> spark.sql("desc hudi_trips_snapshot").show()
    +--------------------+---------+-------+
    |            col_name|data_type|comment|
    +--------------------+---------+-------+
    | _hoodie_commit_time|   string|   null|
    |_hoodie_commit_seqno|   string|   null|
    |  _hoodie_record_key|   string|   null|
    |_hoodie_partition...|   string|   null|
    |   _hoodie_file_name|   string|   null|
    |               rowId|   string|   null|
    |         partitionId|   string|   null|
    |             preComb|   bigint|   null|
    |                name|   string|   null|
    |           versionId|   string|   null|
    |           intToLong|   bigint|   null|
    |            newField|   string|   null|
    +--------------------+---------+-------+


scala> spark.sql("select rowId, partitionId, preComb, name, versionId, intToLong, newField from hudi_trips_snapshot").show()
    +-----+-----------+-------+-------+---------+---------+----------+
    |rowId|partitionId|preComb|   name|versionId|intToLong|  newField|
    +-----+-----------+-------+-------+---------+---------+----------+
    |row_3|     part_0|      0|    tom|      v_0|        0|      null|
    |row_2|     part_0|      5|   john|      v_3|        3|newField_1|
    |row_1|     part_0|      0|    bob|      v_0|        0|      null|
    |row_5|     part_0|      5| maroon|      v_2|        2|newField_1|
    |row_9|     part_0|      5|michael|      v_2|        2|newField_1|
    +-----+-----------+-------+-------+---------+---------+----------+