使用 Apache Hudi 實現 SCD-2(漸變維度)

2022-10-16 12:03:08

資料是當今分析世界的寶貴資產。 在向終端使用者提供資料時,跟蹤資料在一段時間內的變化非常重要。 漸變維度 (SCD) 是隨時間推移儲存和管理當前和歷史資料的維度。 在 SCD 的型別中,我們將特別關注型別 2(SCD 2),它保留了值的完整歷史。 每條記錄都包含有效時間和到期時間,以標識記錄處於活動狀態的時間段。 這可以通過少數審計列來實現。 例如:生效開始日期、生效結束日期和活動記錄指示器。
讓我們瞭解如何使用 Apache Hudi 來實現這種 SCD-2 表設計。

Apache Hudi 是下一代流資料湖平臺。 Apache Hudi 將核心倉庫和資料庫功能直接引入資料湖。 Hudi 提供表、事務、高效的 upserts/deletes、高階索引、流式攝取服務、資料Clustering/壓縮優化和並行性,同時將資料保持為開原始檔格式。

Apache Hudi 預設顯示錶中的快照資料,即最近提交的最新資料。 如果我們想跟蹤歷史變化,我們需要利用 Hudi 的時間點查詢(https://hudi.apache.org/docs/quick-start-guide

Hudi 允許通過時間點查詢舊版本資料或最新資料和時間旅行,通過時間點查詢遍歷歷史資料變化是不高效的,需要對給定資料進行多次時間間隔分析。
讓我們看看如何通過使用經典方法的解決方法來克服這個問題。
讓我們考慮一個包含產品詳細資訊和賣家折扣的表。

+---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
|seller_id|prod_category |product_name   |product_package|discount_percentage|eff_start_ts       |eff_end_ts         |actv_ind|
+---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
|3412     |Healthcare    |Dolo 650       |10             |10                 |2022-04-01 16:30:45|9999-12-31 23:59:59|1       |
|1234     |Detergent     |Tide 2L        |6              |15                 |2021-12-15 15:20:30|9999-12-31 23:59:59|1       |
|1234     |Home Essential|Hand Towel     |12             |20                 |2021-10-20 06:55:22|9999-12-31 23:59:59|1       |
|4565     |Gourmet       |Dairy Milk Silk|6              |30                 |2021-06-12 20:30:40|9999-12-31 23:59:59|1       |
+---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+

步驟

  1. 讓我們使用 Spark 將這些資料寫入 Hudi 表中
spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:2.4.7,org.apache.avro:avro:1.8.2 \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf "spark.sql.hive.convertMetastoreParquet=false"

啟動 spark shell 後,我們可以匯入庫,並建立 Hudi 表,如下所示。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.8
      /_/
         
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_312)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("""create table hudi_product_catalog (
     | seller_id int,
     | prod_category string,
     | product_name string,
     | product_package string,
     | discount_percentage string,
     | eff_start_ts timestamp,
     | eff_end_ts timestamp,
     | actv_ind int
     |  ) using hudi
     | tblproperties (
     |   type = 'cow',
     |   primaryKey = 'seller_id,prod_category,eff_end_ts',
     |   preCombineField = 'eff_start_ts'
     |  )
     | partitioned by (actv_ind)
     |  location 'gs://target_bucket/hudi_product_catalog/'""")

將資料寫入到儲存桶後,如下是 Hudi 目標表的資料格式。

+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key                                                       |_hoodie_partition_path|_hoodie_file_name                                                         |seller_id|prod_category |product_name   |product_package|discount_percentage|eff_start_ts       |eff_end_ts         |actv_ind|
+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
|20220722113258101  |20220722113258101_0_0|seller_id:3412,prod_category:Healthcare,eff_end_ts:253402300799000000    |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-29-1219_20220722113258101.parquet|3412     |Healthcare    |Dolo 650       |10             |10                 |2022-04-01 16:30:45|9999-12-31 23:59:59|1       |
|20220722113258101  |20220722113258101_0_1|seller_id:1234,prod_category:Home Essential,eff_end_ts:253402300799000000|actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-29-1219_20220722113258101.parquet|1234     |Home Essential|Hand Towel     |12             |20                 |2021-10-20 06:55:22|9999-12-31 23:59:59|1       |
|20220722113258101  |20220722113258101_0_2|seller_id:4565,prod_category:Gourmet,eff_end_ts:253402300799000000       |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-29-1219_20220722113258101.parquet|4565     |Gourmet       |Dairy Milk Silk|6              |30                 |2021-06-12 20:30:40|9999-12-31 23:59:59|1       |
|20220722113258101  |20220722113258101_0_3|seller_id:1234,prod_category:Detergent,eff_end_ts:253402300799000000     |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-29-1219_20220722113258101.parquet|1234     |Detergent     |Tide 2L        |6              |15                 |2021-12-15 15:20:30|9999-12-31 23:59:59|1       |
+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+---------------+---------------+-------------------+-------------------+-------------------+--------+

2.假設我們的增量資料儲存在下表中(非Hudi格式,可以是Hive)。

+---------+-------------+-----------------+---------------+-------------------+-------------------+
|seller_id|prod_category|product_name     |product_package|discount_percentage|eff_start_ts       |
+---------+-------------+-----------------+---------------+-------------------+-------------------+
|1234     |Detergent    |Tide 5L          |6              |25                 |2022-01-31 10:00:30|
|4565     |Gourmet      |Dairy Milk Almond|12             |45                 |2022-06-12 20:30:40|
|3345     |Stationary   |Sticky Notes     |4              |12                 |2022-07-09 21:30:45|
+---------+-------------+-----------------+---------------+-------------------+-------------------+
  1. 現在讓我們通過對目標表進行Left Anti Join過濾掉增量表中的所有 Insert only 記錄。
val updFileDf = spark.read.option("header",true).csv("gs://target_bucket/hudi_product_catalog/hudi_product_update.csv")
val tgtHudiDf = spark.sql("select * from hudi_product_catalog")
hudiTableData.createOrReplaceTempView("hudiTable")

//Cast as needed
val stgDf = updFileDf.withColumn("eff_start_ts",to_timestamp(col("eff_start_ts")))
.withColumn("seller_id",col("seller_id").cast("int"))

//Prepare an insert DF from incremental temp DF
val instmpDf = stgDf.as("stg")
      .join(tgtHudiDf.as("tgt"),
        col("stg.seller_id") === col("tgt.seller_id") &&
          col("stg.prod_category") === col("tgt.prod_category"),"left_anti")
.select("stg.*")

val insDf = instmpDf.withColumn("eff_end_ts",to_timestamp(lit("9999-12-31 23:59:59")))
.withColumn("actv_ind",lit(1))


insDf.show(false)
+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
|seller_id|prod_category|product_name|product_package|discount_percentage|       eff_start_ts|         eff_end_ts|actv_ind|
+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
|     3345|   Stationary|Sticky Notes|              4|                 12|2022-07-09 21:30:45|9999-12-31 23:59:59|       1|
+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
  1. 我們有一個只插入記錄的DataFrame。 接下來讓我們建立一個DataFrame,其中將包含來自 delta 表和目標表的屬性,並在目標上使用內連線,它將獲取需要更新的記錄。
//Prepare an update DF from incremental temp DF, select columns from both the tables
val updDf = stgDf.as("stg")
      .join(tgtHudiDf.as("tgt"),
        col("stg.seller_id") === col("tgt.seller_id") &&
          col("stg.prod_category") === col("tgt.prod_category"),"inner")
          .where(col("stg.eff_start_ts") > col("tgt.eff_start_ts"))
.select((stgDf.columns.map(c => stgDf(c).as(s"stg_$c"))++ tgtHudiDf.columns.map(c => tgtHudiDf(c).as(s"tgt_$c"))):_*)

updDf.show(false)

+-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+
|stg_seller_id|stg_prod_category| stg_product_name|stg_product_package|stg_discount_percentage|   stg_eff_start_ts|tgt__hoodie_commit_time|tgt__hoodie_commit_seqno|tgt__hoodie_record_key|tgt__hoodie_partition_path|tgt__hoodie_file_name|tgt_seller_id|tgt_prod_category|tgt_product_name|tgt_product_package|tgt_discount_percentage|   tgt_eff_start_ts|     tgt_eff_end_ts|tgt_actv_ind|
+-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+
|         1234|        Detergent|          Tide 5L|                  6|                     25|2022-01-31 10:00:30|      20220710113622931|    20220710113622931...|  seller_id:1234,pr...|                actv_ind=1| 2dd6109f-2173-429...|         1234|        Detergent|         Tide 2L|                  6|                     15|2021-12-15 15:20:30|9999-12-31 23:59:59|           1|
|         4565|          Gourmet|Dairy Milk Almond|                 12|                     45|2022-06-12 20:30:40|      20220710113622931|    20220710113622931...|  seller_id:4565,pr...|                actv_ind=1| 2dd6109f-2173-429...|         4565|          Gourmet| Dairy Milk Silk|                  6|                     30|2021-06-12 20:30:40|9999-12-31 23:59:59|           1|
+-------------+-----------------+-----------------+-------------------+-----------------------+-------------------+-----------------------+------------------------+----------------------+--------------------------+---------------------+-------------+-----------------+----------------+-------------------+-----------------------+-------------------+-------------------+------------+
  1. 現在我們有一個DataFrame,它在一條記錄中包含新舊資料,讓我們在各自單獨的DataFrame中拉取更新記錄的活動和非活動範例。

在進行上述練習時,我們將通過更改活動(新)記錄的 eff_end_tsto eff_start_ts -1 並更新 actv_ind = 0 來廢棄非活動記錄

//Prepare Active updates

val updActiveDf = updDf.select(col("stg_seller_id").as("seller_id"),
col("stg_prod_category").as("prod_category"),
col("stg_product_name").as("product_name"),
col("stg_product_package").as("product_package"),
col("stg_discount_percentage").as("discount_percentage"),
col("stg_eff_start_ts").as("eff_start_ts"),
to_timestamp(lit("9999-12-31 23:59:59")) as ("eff_end_ts"),
lit(1) as ("actv_ind"))

updActiveDf.show(false)
+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
|seller_id|prod_category|product_name     |product_package|discount_percentage|eff_start_ts       |eff_end_ts         |actv_ind|
+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
|1234     |Detergent    |Tide 5L          |6              |25                 |2022-01-31 10:00:30|9999-12-31 23:59:59|1       |
|4565     |Gourmet      |Dairy Milk Almond|12             |45                 |2022-06-12 20:30:40|9999-12-31 23:59:59|1       |
+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+

//Prepare inactive updates, which will become obsolete records

val updInactiveDf = updDf.select(col("tgt_seller_id").as("seller_id"),
col("tgt_prod_category").as("prod_category"),
col("tgt_product_name").as("product_name"),
col("tgt_product_package").as("product_package"),
col("tgt_discount_percentage").as("discount_percentage"),
col("tgt_eff_start_ts").as("eff_start_ts"),
(col("stg_eff_start_ts") - expr("interval 1 seconds")).as("eff_end_ts"),
lit(0) as ("actv_ind"))

scala> updInactiveDf.show
+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
|seller_id|prod_category|   product_name|product_package|discount_percentage|       eff_start_ts|         eff_end_ts|actv_ind|
+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
|     1234|    Detergent|        Tide 2L|              6|                 15|2021-12-15 15:20:30|2022-01-31 10:00:29|       0|
|     4565|      Gourmet|Dairy Milk Silk|              6|                 30|2021-06-12 20:30:40|2022-06-12 20:30:39|       0|
+---------+-------------+---------------+---------------+-------------------+-------------------+-------------------+--------+
  1. 現在我們將使用union運運算元將插入、活動更新和非活動更新拉入單個DataFrame。 將此DataFrame作為最終 Hudi 寫入邏輯的增量源。
scala> val upsertDf = insDf.union(updActiveDf).union(updInactiveDf)

scala> upsertDf.show

+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
|seller_id|prod_category|     product_name|product_package|discount_percentage|       eff_start_ts|         eff_end_ts|actv_ind|
+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
|     3345|   Stationary|     Sticky Notes|              4|                 12|2022-07-09 21:30:45|9999-12-31 23:59:59|       1|
|     4565|      Gourmet|Dairy Milk Almond|             12|                 45|2022-06-12 20:30:40|9999-12-31 23:59:59|       1|
|     1234|    Detergent|          Tide 5L|              6|                 25|2022-01-31 10:00:30|9999-12-31 23:59:59|       1|
|     4565|      Gourmet|  Dairy Milk Silk|              6|                 30|2021-06-12 20:30:40|2022-06-12 20:30:39|       0|
|     1234|    Detergent|          Tide 2L|              6|                 15|2021-12-15 15:20:30|2022-01-31 10:00:29|       0|
+---------+-------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+

val path = "gs://target_bucket/hudi_product_catalog"

upsertDf.write.format("org.apache.hudi")
.option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE") 
.option("hoodie.datasource.write.keygenerator.class","org.apache.hudi.keygen.ComplexKeyGenerator") 
.option(RECORDKEY_FIELD_OPT_KEY, "seller_id,prod_category,eff_end_ts")
.option(PRECOMBINE_FIELD_OPT_KEY, "eff_start_ts") 
.option("hoodie.table.name","hudi_product_catalog") 
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "target_schema") 
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "hudi_product_catalog") 
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
.option(PARTITIONPATH_FIELD_OPT_KEY, "actv_ind")
.mode(Append)
.save(s"$path")

scala> spark.sql("refresh table stg_wmt_ww_fin_rtn_mb_dl_secure.hudi_product_catalog")

scala> spark.sql("select * from stg_wmt_ww_fin_rtn_mb_dl_secure.hudi_product_catalog").show(false)

+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
|_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key                                                       |_hoodie_partition_path|_hoodie_file_name                                                         |seller_id|prod_category |product_name     |product_package|discount_percentage|eff_start_ts       |eff_end_ts         |actv_ind|
+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+
|20220722113258101  |20220722113258101_0_0|seller_id:3412,prod_category:Healthcare,eff_end_ts:253402300799000000    |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|3412     |Healthcare    |Dolo 650         |10             |10                 |2022-04-01 16:30:45|9999-12-31 23:59:59|1       |
|20220722113258101  |20220722113258101_0_1|seller_id:1234,prod_category:Home Essential,eff_end_ts:253402300799000000|actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|1234     |Home Essential|Hand Towel       |12             |20                 |2021-10-20 06:55:22|9999-12-31 23:59:59|1       |
|20220722114049500  |20220722114049500_0_2|seller_id:4565,prod_category:Gourmet,eff_end_ts:253402300799000000       |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|4565     |Gourmet       |Dairy Milk Almond|12             |45                 |2022-06-12 20:30:40|9999-12-31 23:59:59|1       |
|20220722114049500  |20220722114049500_0_3|seller_id:1234,prod_category:Detergent,eff_end_ts:253402300799000000     |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|1234     |Detergent     |Tide 5L          |6              |25                 |2022-01-31 10:00:30|9999-12-31 23:59:59|1       |
|20220722114049500  |20220722114049500_0_4|seller_id:3345,prod_category:Stationary,eff_end_ts:253402300799000000    |actv_ind=1            |a94c9c58-ac6b-4841-a734-8ef1580e2547-0_0-72-2451_20220722114049500.parquet|3345     |Stationary    |Sticky Notes     |4              |12                 |2022-07-09 21:30:45|9999-12-31 23:59:59|1       |
|20220722114049500  |20220722114049500_1_0|seller_id:4565,prod_category:Gourmet,eff_end_ts:1655065839000000         |actv_ind=0            |789e0317-d499-4d74-a5d9-ad6e6517d6b8-0_1-72-2452_20220722114049500.parquet|4565     |Gourmet       |Dairy Milk Silk  |6              |30                 |2021-06-12 20:30:40|2022-06-12 20:30:39|0       |
|20220722114049500  |20220722114049500_1_1|seller_id:1234,prod_category:Detergent,eff_end_ts:1643623229000000       |actv_ind=0            |789e0317-d499-4d74-a5d9-ad6e6517d6b8-0_1-72-2452_20220722114049500.parquet|1234     |Detergent     |Tide 2L          |6              |15                 |2021-12-15 15:20:30|2022-01-31 10:00:29|0       |
+-------------------+---------------------+-------------------------------------------------------------------------+----------------------+--------------------------------------------------------------------------+---------+--------------+-----------------+---------------+-------------------+-------------------+-------------------+--------+

實施過程中需要考慮的幾點

  • 對於現有記錄的每次更新,parquet 檔案將在儲存中重新寫入/移動,這可能會影響寫入時的效能
  • 在查詢資料期間,根據代表主要過濾器的屬性對目標表進行分割區總是一個更好的主意。 例如:銷售表中的銷售日期,註冊產品目錄的賣家。 上述範例中選擇了 actv_ind ,因為我們希望使其易於解釋並將所有活動記錄儲存在一個分割區中。

結論

隨著我們持續使用 Apache Hudi 編寫 Spark 應用程式,我們將繼續改進載入資料的策略,上述嘗試只是用 Hudi 實現 SCD-2 功能的一個開始。