巨量資料下一代變革之必研究資料湖技術Hudi原理實戰雙管齊下-中

2022-11-24 06:00:50

@

核心原理

資料寫

寫操作

  • UPSERT:預設行為,資料先通過 index 打標(INSERT/UPDATE),有一些啟發式演演算法決定訊息的組織以優化檔案的大小 => CDC 匯入。
  • INSERT:跳過 index,寫入效率更高 => Log Deduplication。
  • BULK_INSERT:寫排序,對巨量資料量的 Hudi 表初始化友好,是對檔案大小的限制最好效果(寫 HFile)。

UPSERT寫流程

  • Copy On Write (COW)
    • 先對 records 按照 record key 去重。
    • 首先對這批資料建立索引 (HoodieKey => HoodieRecordLocation);通過索引區分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)。
    • 對於 update 訊息,會直接找到對應 key 所在的最新 FileSlice 的 base 檔案,並做 merge 後寫新的 base file (新的 FileSlice)。
    • 對於 insert 訊息,會掃描當前 partition 的所有 SmallFile(小於一定大小的 base file),然後 merge 寫新的 FileSlice;如果沒有 SmallFile,直接寫新的 FileGroup + FileSlice。
  • Merge On Read (MOR)
    • 先對 records 按照 record key 去重(可選)。
    • 首先對這批資料建立索引 (HoodieKey => HoodieRecordLocation);通過索引區分哪些 records 是 update,哪些 records 是 insert(key 第一次寫入)。
    • 如果是 insert 訊息,如果 log file 不可建索引(預設),會嘗試 merge 分割區內最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果沒有 base file 就新寫一個 FileGroup + FileSlice + base file;如果 log file 可建索引,嘗試 append 小的 log file,如果沒有就新寫一個 FileGroup + FileSlice + base file。
    • 如果是 update 訊息,寫對應的 file group + file slice,直接 append 最新的 log file(如果碰巧是當前最小的小檔案,會 merge base file,生成新的 file slice)。
    • log file 大小達到閾值會 roll over 一個新的。

INSERT寫流程

  • Copy On Write
    • 先對 records 按照 record key 去重(可選)。
    • 不會建立 Index。
    • 如果有小的 base file 檔案,merge base file,生成新的 FileSlice + base file,否則直接寫新的 FileSlice + base file。
  • Merge On Read
    • 先對 records 按照 record key 去重(可選)。
    • 不會建立 Index。
    • 如果 log file 可索引,並且有小的 FileSlice,嘗試追加或寫最新的 log file;如果 log file 不可索引,寫一個新的 FileSlice + base file。

INSERT OVERWRIT寫流程

在同一分割區中建立新的檔案組集,現有的檔案組被標記為 「刪除」,根據新記錄的數量建立新的檔案組。

COW流程如下

MOR流程如下

  • 優點
    • COW和MOR在執行方面非常相似。不干擾MOR的compaction。
    • 減少parquet檔案大小。
    • 不需要更新關鍵路徑中的外部索引。索引實現可以檢查檔案組是否無效(類似於在HBaseIndex中檢查commit是否無效的方式)。
    • 可以擴充套件清理策略,在一定的時間視窗後刪除舊檔案組。
  • 缺點
    • 需要轉發以前提交的後設資料。
    • 在t1,比如file1被標記為無效,我們在t1.commit中儲存 「invalidFiles=file1」(或者在MOR中儲存deltacommit)。
    • 在t2,比如file2也被標記為無效。我們轉發之前的檔案,並在t2.commit中標記 「invalidFiles=file1, file2」(或MOR的deltacommit)。
    • 忽略磁碟中存在的parquet檔案也是Hudi的一個新行為, 可能容易出錯,我們必須認識到新的行為,並更新檔案系統的所有檢視來忽略它們。這一點可能會在實現其他功能時造成問題。

Key 生成策略

用來生成 HoodieKey(record key + partition path),目前支援以下策略:

  • 支援多個欄位組合 record keys。
  • 支援多個欄位組合的 parition path (可客製化時間格式,Hive style path name)。
  • 非分割區表

刪除策略

  • 邏輯刪:將 value 欄位全部標記為 null。
  • 物理刪:
    • 通過 OPERATION_OPT_KEY 刪除所有的輸入記錄。
    • 設定 PAYLOAD_CLASS_OPT_KEY = org.apache.hudi.EmptyHoodieRecordPayload 刪除所有的輸入記錄。
    • 在輸入記錄新增欄位:_hoodie_is_deleted。

寫流程歸納

通過對寫流程的梳理可以瞭解到 Apache Hudi 相對於其他資料湖方案的核心優勢:

  • 寫入過程充分優化了檔案儲存的小檔案問題,Copy On Write 寫會一直將一個 bucket (FileGroup)的 base 檔案寫到設定的閾值大小才會劃分新的 bucket;Merge On Read 寫在同一個 bucket 中,log file 也是一直 append 直到大小超過設定的閾值 roll over。
  • 對 UPDATE 和 DELETE 的支援非常高效,一條 record 的整個生命週期操作都發生在同一個 bucket,不僅減少小檔案數量,也提升了資料讀取的效率(不必要的 join 和 merge)。

資料讀

  • Snapshot 讀:讀取所有 partiiton 下每個 FileGroup 最新的 FileSlice 中的檔案,Copy On Write 表讀 parquet 檔案,Merge On Read 表讀 parquet + log 檔案。
  • Incremantal讀:當前的 Spark data source 可以指定消費的起始和結束 commit 時間,讀取 commit 增量的資料集。但是內部的實現不夠高效:拉取每個 commit 的全部目標檔案再按照系統欄位 hoodie_commit_time apply 過濾條件。
  • Streaming讀:0.8.0 版本的 HUDI Flink writer 支援實時的增量訂閱,可用於同步 CDC 資料,日常的資料同步 ETL pipeline。Flink 的 streaming 讀做到了真正的流式讀取,source 定期監控新增的改動檔案,將讀取任務下派給讀 task。
  • Compaction 合併
    • 沒有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 並寫 base file。
    • 有 base file:走 copy on write upsert 流程,先讀 log file 建 index,再讀 base file,最後讀 log file 寫新的 base file。
    • Flink 和 Spark streaming 的 writer 都可以 apply 非同步的 compaction 策略,按照間隔 commits 數或者時間來觸發 compaction 任務,在獨立的 pipeline 中執行。

整合Spark使用

環境準備

  • 安裝Spark

Hudi使用Spark-2.4.3+和Spark 3。x版本。Hudi支援的Spark版本如下:

解壓spark-3.3.0-bin-hadoop3.tgz,設定Spark環境變數

vim /etc/profile
export SPARK_HOME=/home/commons/spark-3.3.0-bin-hadoop3
export PATH=$SPARK_HOME/bin:$PATH

source /etc/profile

然後將前面編譯的hudi-spark3.3-bundle_2.12-0.12.1.jar(在hudi的根目錄下packaging/hudi-spark-bundle/target/,至於如何編譯請看前面的內容)拷貝到Spark根目錄下Jars目錄。

  • 啟動hadoop(詳細看前面關於hadoop的文章)

spark-shell使用

啟動

不同版本(Spark 3.3、Spark 3.2、Spark 3.1、Spark 2.4)的spark-shell啟動命令有所不同,下面以Spark 3.3來操作演示。

spark-shell \
  --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

接下來設定表名、基本路徑和資料生成器

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

DataGenerator可以根據旅行應用生成相應的樣例資料插入和更新;spark中不需要單獨的create table命令如果表不存在,第一批寫入操作將建立該表。

插入資料

接下來通過DataGenerator生成一些新的行程資料,將它們載入到DataFrame中,並將DataFrame寫入Hudi表中。

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

mode為Overwrite如果表存在則覆蓋重新建立表。可以從basePath = "file:///tmp/hudi_trips_cow" 設定的本地檔案目錄檢視hoodie的後設資料和資料的變化。

還可以通過外部化組態檔,可以在組態檔Hudi -default.conf中集中設定設定,而不是直接將設定設定傳遞給每個Hudi作業。

查詢資料

先轉成spark的df,然後再執行spark sql的查詢

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

每個hoodie表固定加了如下的五個欄位,hoodie提交時間、hoodie提交序號、hoodie記錄鍵、hoodie分割區路徑、hoodie檔名。

更新資料

類似於插入新資料,同樣使用資料生成器生成新的行程的資料,載入到DataFrame中,並將DataFrame寫入hudi表。

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

注意,現在儲存模式是追加。通常,總是使用追加模式,除非您試圖第一次建立表。再次查詢資料將顯示更新的行程。每個寫操作都會生成一個由時間戳表示的新提交。在之前的提交中尋找相同的_hoodie_record_keys的_hoodie_commit_time、rider、driver欄位的變化。

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, fare, begin_lon, begin_lat, ts,rider, driver from  hudi_trips_snapshot").show()

查詢更新後的資料,已經有部分未更新後的資料,提交時間也有其他的值,記錄數還是10條。

檢視hoodie目錄下已經多個一個版本檔案

時間旅行查詢

從0.9.0開始支援時間旅行查詢。目前支援三種查詢時間格式,如下所示

val tripsSnapshotDF = spark.read.
  format("hudi").
  option("as.of.instant", "20221122143158632").
  load(basePath)

spark.read.
  format("hudi").
  option("as.of.instant", "2022-11-22 14:31:58.632").
  load(basePath)

// 等價於"as.of.instant = 2022-11-22 00:00:00"
spark.read.
  format("hudi").
  option("as.of.instant", "2022-11-22").
  load(basePath)

使用第一種範例如下:

val tripsSnapshotDF1 = spark.read.
  format("hudi").
  option("as.of.instant", "20221121184124298").
  load(basePath)

tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot1")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, fare, begin_lon, begin_lat, ts,rider, driver from  hudi_trips_snapshot1").show()

增量查詢

Hudi還提供了獲取自給定提交時間戳以來更改的記錄流的功能。這可以通過使用Hudi的增量查詢來實現,並提供需要流化更改的開始時間。如果希望在給定的提交之後進行所有更改(通常是這樣),則不需要指定endTime。這將給出在beginTime提交後發生的所有更改,過濾器為fare > 20.0。該特性的獨特之處在於,它現在允許您在批次處理資料上編寫流管道。利用增量管道可以在批次處理資料上建立增量管道。

先將上面的更新資料多執行幾次,產生多個版本的資料

spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2)

val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

指定時間點查詢

時間可以通過將endTime指向特定的提交時間,將beginTime指向「000」(表示儘可能早的提交時間)來表示。

val beginTime = "000" 
val endTime = commits(commits.length - 2) 

val tripsPointInTimeDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  option(END_INSTANTTIME_OPT_KEY, endTime).
  load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

刪除資料

Apache Hudi支援兩種型別的刪除:

  • 軟刪除:保留記錄鍵,只清除所有其他欄位的值(軟刪除中為空的記錄始終儲存在儲存中,而不會刪除)。注意,儲存模式是追加。

先查詢當前記錄數

spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

執行軟刪除後檢視記錄數,有兩條被置為空。

val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)

val nullifyColumns = softDeleteDs.schema.fields.
  map(field => (field.name, field.dataType.typeName)).
  filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
    && !Array("ts", "uuid", "partitionpath").contains(pair._1)))

val softDeleteDf = nullifyColumns.
  foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
    (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))

softDeleteDf.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY, "upsert").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
spark.sql("select uuid, partitionpath from hudi_trips_snapshot where rider is not null").count()

  • 硬刪除:從表中物理刪除記錄的任何痕跡。

刪除傳進來的hoodiekey記錄,刪除操作只支援「追加」模式。

spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

val deletes = dataGen.generateDeletes(ds.collectAsList())
val hardDeleteDf = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

hardDeleteDf.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY, "delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

val roAfterDeleteViewDF = spark.
  read.
  format("hudi").
  load(basePath)

roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

覆蓋資料

生成一些新的行程資料,覆蓋輸入中出現的所有分割區。對於批次處理ETL作業,此操作比upsert快,批次處理ETL作業一次重新計算整個目標分割區(與增量更新目標表相反)。這是由於能夠完全繞過索引、預合併和upsert寫路徑中的其他重分割區步驟。

先檢視當前的key資料

spark.
  read.format("hudi").
  load(basePath).
  select("uuid","partitionpath").
  sort("partitionpath","uuid").
  show(100, false)

執行覆蓋資料操作(類似hive的insert overwrite的功能)後檢視key的資料。

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.
  read.json(spark.sparkContext.parallelize(inserts, 2)).
  filter("partitionpath = 'americas/united_states/san_francisco'")
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION.key(),"insert_overwrite").
  option(PRECOMBINE_FIELD.key(), "ts").
  option(RECORDKEY_FIELD.key(), "uuid").
  option(PARTITIONPATH_FIELD.key(), "partitionpath").
  option(TBL_NAME.key(), tableName).
  mode(Append).
  save(basePath)

spark.
  read.format("hudi").
  load(basePath).
  select("uuid","partitionpath").
  sort("partitionpath","uuid").
  show(100, false)

spark-sql使用

啟動

Hudi支援使用Spark SQL與HoodieSparkSessionExtension SQL擴充套件寫和讀資料。在解壓的目錄下執行Spark SQL和Hudi:

  • 啟動hive的後設資料服務
nohup hive --service metastore &
  • 啟動spark-sql,如果沒有設定hive的環境變數,拷貝hive-site.xml到spark的conf目錄。不同版本(Spark 3.3、Spark 3.2、Spark 3.1、Spark 2.4)的spark-sql啟動命令有所不同,下面以Spark 3.3來操作演示
spark-sql --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.12.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

建立表

Spark SQL需要一個顯式的create table命令。

  • Hudi的兩種表型別:即寫時複製(COW)和讀時合併(MOR),都可以使用Spark SQL建立。在建立表時,可以使用type選項指定表的型別:type = 'cow'或type = 'mor'。
  • 分割區表和非分割區表:使用者可以在Spark SQL中建立分割區表或非分割區表。要建立分割區表,需要使用partitioned by語句指定分割區列以建立分割區表。當沒有使用create table命令進行分割區的語句時,該表被認為是一個非分割區表。
  • 內部管理表和外部表:通常,Spark SQL支援兩種表,即託管表和外部表。如果使用location語句或使用create external table顯式地建立表來指定一個位置,則它是一個外部表,否則它被認為是一個內部管理表。

接下來通過實際sql演示如何建立不同的表。

  • 建立一個非分割區表

    • 建立一個cow表,預設primaryKey 'uuid',不提供preCombineField。
    create database hudi_spark;
    use hudi_spark;
    create table hudi_cow_nonpcf_tbl (
      uuid int,
      name string,
      price double
    ) using hudi;
    

    • 建立一個提供preCombineField的mor非分割區表
    create table hudi_mor_tbl (
      id int,
      name string,
      price double,
      ts bigint
    ) using hudi
    tblproperties (
      type = 'mor',
      primaryKey = 'id',
      preCombineField = 'ts'
    );
    
  • 建立外部COW分割區表

create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

  • 為已有的Hudi Table建立Table,可以在現有的hudi表上建立一個表(用spark-shell或deltastreamer建立)。這對於對已有的hudi表進行讀寫非常有用。
create table hudi_existing_tbl using hudi
location '/tmp/hudi/hudi_cow_pt_tbl';
  • CTAS,Hudi 支援在Spark SQL使用CTAS (Create Table As Select)

    • 建立一個不帶preCombineField的非分割區cow表
    create table hudi_ctas_cow_nonpcf_tbl
    using hudi
    tblproperties (primaryKey = 'id')
    as
    select 1 as id, 'a1' as name, 10 as price;
    
    • 使用範例建立一個分割區的主鍵COW表。
    create table hudi_ctas_cow_pt_tbl
    using hudi
    tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
    partitioned by (dt)
    as
    select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
    

    • 通過CTAS從另一個表載入資料,file://代表本地檔案目錄
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';
create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (datestr) as select * from parquet_mngd;

建立表屬性可以在建立hudi表時設定表屬性,關鍵選項如下:

  • primaryKey:表的主鍵名,多個欄位用逗號分隔。與hoodie.datasource.write.recordkey.field相同,預設為uuid。
  • preCombineField:表的預合併欄位,與hoodie.datasource.write.precombine.field相同。
  • type:建立的表型別,type = 'cow'表示COPY-ON-WRITE表,而type = 'mor'表示MERGE-ON-READ表。與hoodie.datasource.write.table.type相同。

插入資料

-- 插入非分割區表
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
-- 插入動態分割區
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;
-- 插入靜態分割區
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;

-- precombinefield提供的表的upsert模式
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;

-- bulk_insert模式用於precombinefield提供的表
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;
insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;

時間旅行查詢

create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

insert into hudi_cow_pt_tbl select 3, 'c0', 1000, '2022-11-23', '10';
select * from hudi_cow_pt_tbl;

-- 記錄id=3 修改 `name`
insert into hudi_cow_pt_tbl select 3, 'c1', 1001, '2022-11-23', '10';
select * from hudi_cow_pt_tbl;

-- 基於第一次提交時間的時間旅行,假設 `20220307091628793`
select * from hudi_cow_pt_tbl timestamp as of '20221123153135498' where id = 3;
-- 基於不同時間戳格式的時間旅行
select * from hudi_cow_pt_tbl timestamp as of '2022-11-23 15:31:35.498' where id = 3;
select * from hudi_cow_pt_tbl timestamp as of '2022-11-23' where id = 3;

更新資料

update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
update hudi_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;
update hudi_cow_pt_tbl set ts = 1005 where name = 'a1_1';

  • 使用hudi測試合併到非分割區表的源表
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;
select * from hudi_mor_tbl ;

  • 源表使用拼花測試合併到分割區表
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2022-11-23', '10'), (2, "new_a2", 'delete', '2022-11-23', '11'), (3, "new_a3", 'insert', '2022-11-23', '12');

merge into hudi_cow_pt_tbl as target
using (
  select id, name, '1000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
 update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
 insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;

刪除資料

delete from hudi_cow_nonpcf_tbl where uuid = 1;
delete from hudi_mor_tbl where id % 2 = 0;
delete from hudi_cow_pt_tbl where name = 'a1';

覆蓋資料

insert覆蓋分割區表使用INSERT_OVERWRITE_TABLE型別的寫操作,而非分割區表使用INSERT_OVERWRITE_TABLE型別的寫操作。

-- 插入覆蓋非分割區表
insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;

-- 用動態分割區插入覆蓋分割區表
insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09', '10';

-- 用靜態分割區插入覆蓋分割區表
insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;

其他

-- 改表名
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
-- 新增列
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
-- 修改列
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;
-- 設定表屬性
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
-- 顯示分割區
show partitions hudi_cow_pt_tbl;
-- 刪除分割區
alter table hudi_cow_pt_tbl drop partition (dt='2022-11-23', hh='10');

本人部落格網站IT小神 www.itxiaoshen.com