李呈祥:bilibili在湖倉一體查詢加速上的實踐與探索

2022-06-15 15:03:14


導讀: 本文主要介紹嗶哩嗶哩在資料湖與資料倉儲一體架構下,探索查詢加速以及索引增強的一些實踐。主要內容包括:

  • 什麼是湖倉一體架構
  • 嗶哩嗶哩目前的湖倉一體架構
  • 湖倉一體架構下,資料的排序組織優化
  • 湖倉一體架構下,索引增強與優化的實踐探索

--

01 什麼是湖倉一體

當我們講湖倉一體時,涉及到資料湖和資料倉儲兩個概念。

什麼是資料湖?通常來說,它有以下幾個特點:

  • 有一個統一的儲存系統,所有的資料都放到這個統一的儲存系統裡,沒有資料孤島。
  • 支援任意資料型別,比較自由,包括結構化、半結構化和非結構化的資料。這些不同型別的資料都可以統一放到儲存系統裡。
  • 對於多個計算引擎是開放的,包括實時、離線的分析等,計算引擎很豐富。
  • 有比較靈活的資料處理介面。有基於SQL這種級別的資料處理,也可以像基於Spark,提供更底層的,基於Dataset甚至RDD這種層面的,更甚者對於機器學習一些場景去對資料進行解析。也可以直接用檔案系統,通過API直接讀取檔案。
  • 因其靈活性,資料質量相對較低,比較難管理。

因此,基於資料湖的靈活與易用,它非常適合用於基於未知資料的探索與創新。

什麼是資料倉儲?它有以下幾個特性:

  • 強格式(schema),事前對資料進行建模
  • 有封閉的資料格式與儲存,不對其他引擎開放
  • 查詢效率高:儲存與計算的緊密結合優化,豐富的索引/預計算等支援
  • 資料入倉,無論是實時的還是離線的,或者說資料的儲存組織由數倉內部而非寫入任務決定
  • 資料質量高,容易運維管理,建設成本高

因此,高效和可靠的特性使得資料倉儲非常適用於基於已知資料的分析與決策。

嗶哩嗶哩和大部分網際網路公司一樣,之前的巨量資料平臺都是基於開源的Hadoop生態系統。儲存用的是HDFS,計算引擎則有Hive,Spark以及Presto等。在我看來,這是一個比較典型的資料湖的架構。但是,我們也有很多互動分析的需求,為了解決這些需求,我們會引入特定的分散式數倉,引擎使用的是ClickHouse。

這樣就會引入新的問題。

比如,針對某個資料產品或者資料服務,為了對其提供比較好的查詢效率和效能,需要先把資料從HDFS入倉到ClickHouse,使得整個的流程變長。其次,會帶來資料冗餘的問題,因為資料在HDFS上有一份,在ClickHouse上也有一份。第三,在入倉過程中,可能會對資料做一些操作,使得資料發生變化,導致很難與HFDS裡的其他資料進行關聯,導致資料孤島出現。這兩年,像Iceberg,Hudi以及Delta Lake這種資料湖的儲存格式也逐漸被很多公司引入去解決上述問題。對我們來說,我們主要使用的是Iceberg這種引擎去解決資料湖和資料倉儲之間存在gap的問題。

在我們看來,湖倉一體的目標有三個:

第一,希望它還是像資料湖一樣靈活。主要是我們還是用統一的HDFS儲存,和之前的SQL on Hadoop生態系統無縫相容,包括基於Spark或者Flink ETL資料接入,SQL/ML/DataSet等各層次API存取以及Presto/Spark/Hive等多種計算引擎的支援。

第二,希望它像資料倉儲一樣高效。針對於寫成Iceberg格式的表,我們希望它能夠做到或者接近於專用的分散式數倉的高效查詢效率,包括資料分佈組織、索引、預計算、計算儲存一體化/快取等整體的增強和優化。像Iceberg本身提供了粗粒度這種事務的能力,使得我們能夠擁有支援更多的變化讀寫,實時進實時出倉的額外能力。

第三,希望它能像風一樣自由,這個是對使用者而言,希望可以做到智慧化,使用者的使用門檻更低,易用性更高。之前,使用者想要使其ETL結果的資料分析查詢效率更高的話,需要關注很多方面,比如寫出的資料是不是小檔案會非常多,ETL裡的SQL邏輯怎麼寫使得寫出去的資料怎樣排序,是否需要做預計算等,並且這些方面和使用者的業務邏輯沒有關係。

使用者想追求比較高的查詢效率,這些在原來的方案中,使用者需要考慮這些事情,也就是說,使用者需要自己變成一個巨量資料專家才能解決這些問題,並且還要自己去開發,有額外的ETL任務,因此,對使用者的門檻就比較高。第二步,它需要做到更自動化。對使用者來說,只需要考慮常用的過濾欄位,聚合維度和統計項等是哪些。後臺的這種自動服務可以幫助解決資料的組織、排序等問題。第三階段,希望能做到智慧化。使用者只需要關注表裡有哪些欄位,算什麼型別等業務資訊。後面的這些資料的組織,索引的構建以及預計算等全部都能被自動化。使用者不需要關心這些,只需關注基於某張表,怎麼寫SQL表達業務邏輯就行。

--

02 湖倉一體架構

在嗶哩嗶哩,湖倉一體架構的核心是Iceberg,這是我們在Hudi,Delta Lake以及Iceberg這三個中進行選擇的最終結果。整個的資料處理流程架構大概是這樣的:實時的資料在Kafka裡,通過Flink實施ETL,將資料以Iceberg格式寫到HDFS,離線的資料則通過Spark寫入HDFS。對於使用者來說,資料寫到Iceberg時,我們的後臺有一個自研的Magnus服務,會針對資料落地到HDFS上的Iceberg表進行持續的整體的組織優化。具體如何進行優化,會在下一個部分詳細介紹,主要是運用Spark任務。

在分析端,我們用的是Trino做查詢引擎,它是PrestoSQL改名後的稱呼。同時,我們還用了Alluxio,因為Iceberg裡後設資料以及索引資料相較於原始資料來說,資料量都比較小,所以我們將其快取到Alluxio,方便快取加速。Magnus則是我們基於湖倉一體核心Iceberg的資料管理服務。它主要的任務是對於Iceberg資料做優化和管理,包括基本資訊的展示,比如表/分割區/檔案以及Snapshot等。在其內部還有一個scheduler,用於資料優化作業的排程。無論是離線的批任務,還是實時的任務,把資料寫到Iceberg的表裡的時候,都會把這些commit event傳送到Magnus,裡面會有一個佇列,scheduler會根據制定的特定policy去消費佇列,去決定對哪些Iceberg表做相應工作,拉起對應的Spark任務做具體事情。


之前,基於Iceberg,Hudi或者Delta Lake的湖倉一體架構,大家比較關注或者說應用場景比較多的就是實時數倉。因為它們的對於粗粒度的事務的支援,去解決提供(近)實時數倉的能力。對於我們來說,除了一些比較重要的,比較獨立的資料產品服務我們會放到專門的像ClickHouse這樣的分散式數倉裡去做刪除和查詢。實際上,我們在數倉建設過程中,還是有大量的業務場景還是基於之前的Hadoop的資料湖的架構上,我們數倉開發部門的同學基於這種資料湖架構去做數倉的建設,比如從ods到dwd等不同的分層建模。實際上,大部分數倉建模工作,其資料還是寫到HFDS上,然後應用Presto或者Spark去做分析。

在這種場景之下,我們湖倉一體架構的目標是如何加速查詢效能,使其效率可以達到或者接近專門的分散式數倉那樣。我們分析裡開源的湖倉一體方案以及分散式數倉的效能Gap,這就涉及到Runtime引擎、儲存以及預計算等效能方面上的比較,這裡面涉及到的效能相關的因素非常多。本文主要分享在儲存裡的排序組織以及索引上的一些探索和實踐。為什麼選擇這兩個因素,主要是我們的調研結果認為它們是開源的湖倉一體方案與分散式數倉的效能gap裡最明顯的部分。

--

03 資料的排序組織

首先,我們來看資料的排序組織。

說到典型的資料分析場景,我們這次做的分享是基於star schema 作為一個benchmark的多維分析場景,整個資料模型就是一個事實表外加多個維度表,查詢的模式也是比較固定的。先關聯,再過濾,接著聚合,最後對結果做排序。其中的過濾條件可以是等值過濾也可能是範圍過濾,而過濾欄位可以是高基數位段也可能是低基數位段。

因此,在這種典型的多維分析場景下,也是我們實際業務中會經常遇到的問題是:我們如何在各種型別欄位及各種過濾條件下,執行查詢時唯讀取需要的資料,而不是做全表的掃描?這裡有兩個比較關鍵的點,通過資料的組織外加索引,使得我們只查詢讀取SQL邏輯上所需要的那部分資料。


就索引來說,比如Iceberg,它已經預設提供檔案級別的MinMax索引,在Meta檔案裡,它會記錄每列的Min和Max值。

舉個例子,我們有四個檔案,在Meta檔案裡就會記錄相應的Max和Min值,那麼對於下圖的查詢案例,我們可以通過age=17對檔案進行過濾,按照age對資料檔案之間的資料進行排序,那麼在這個查詢中,讀取時的過濾效果會非常好,可以不用讀取其中的三個檔案,因為通過Iceberg的Meta檔案,可以判斷出來我們只需要讀取檔案2,提高了查詢效率。


但是,我們做好排序之後,比如說我們另外有一個查詢,需要根據obd這個欄位去做過濾,這時候就會產生問題。通常,使用者的查詢裡過濾的欄位不止一個,我們在做排序設定的時候,當全域性依次用a,b,c三個欄位做排序,對a的資料聚集性是最好的,越往後的欄位聚集性越不好。如果欄位a的基數比較高,那麼對於欄位b,甚至後面的欄位c等,可能就完全沒有過濾效果。在用這些欄位進行具體查詢時,過濾就基本無效。

這是MinMax索引排序經常會遇到的一個問題,一種解決方案是使用projection的方式——按照另外欄位再做一次排序來儲存資料。而在嗶哩嗶哩,採用的是引入Z-Order排序的方式。

Z-Order具體是什麼呢?舉例來說,我們有a,b,c三個欄位,用這三個欄位進行排序的時候,我們希望同時可以保證它們的聚集性,而不是像global order那樣優先保證a的,其次是b和c的。我們希望在最終的排序結果裡三個欄位都能有一定的聚集性。

具體怎麼做呢?實際上,Z-Order是把天然沒有有序性的多維資料以某種方式對映成一維資料進行比較。對映後的一維資料,能夠保證各個原始維度按照同種程度去保證其聚集性。舉一個簡單的例子,如下圖所示,對X,Y這兩個維度進行位元位的交叉組值,形成了Interleave Index進而得出一個新的值,這個值被稱作Z-Value。從圖中,可以看到針對X,Y這兩個欄位的資料,生成的z-value會呈現出一個Z形巢狀。這樣的一個結構,在進行切分時,能夠同時保證X,Y兩個欄位的聚集性。


這種聚集性,又會帶來一個新的問題:我們支援的資料型別不止Int型別,如何將
Int/Long/String/Date/Timestamp等各種型別資料進行正整型轉化,進行Interleave Index計算以及計算出相應Z-Value?

因此,實現Z-Order 的一個前提是需要保證資料以保序的方式對映成一個正整型。對於Int型別的資料,可以做首位位元逆轉來實現;對於其它型別的資料,實現的方式都不太一樣,比如對String型別,會取固定的前幾位來進行排序。

但是保序對映也存在問題:

第一,從原始值到對映值的過程中可能會丟失數值資訊。比如String型別的資料,如果只取前幾位的話,後面的資訊就丟失掉了。

第二,對映值的分佈無法保證從0開始是正整形,導致z-value不符合Z-Order曲線的巢狀分佈。比如,X的取值是0,1,2,3,4,5,6,7,Y的取值是8,16,24,32這種,計算出來的z-value排序效果實際上和資料按照order by y,x的效果是一樣的。也就是說這種排序並沒有帶來額外的好處,對於X的聚集性無法保證。

因此,我們引入了Boundary-based interleave Index這種計算方式。它主要對Spark RangePartitioner進行了一些改造,實現了一個新的排序方法。以下圖為例,我們需要對city和age兩個欄位進行Z-Order排序,我們對這兩個欄位進行資料取樣,取樣之後,對每個欄位進行排序後再繼續取樣。把boundary取樣出來之後,對於進來的資料,Spark的shuffle partition會把這個值和boundary進行比較,取boundary的index值去進行計算出它的z-value值。因為我們是按照boundary的index值進行計算,所以z-value肯定是從零開始的正整型。

下圖是Z-Order的一個效果呈現,具體來說,我們對所示的三個欄位進行Z-Order排序,然後我們發現它可以做到百分之八十多的data skipping,即百分之八十的資料在在查詢時不用被讀取。

另外,我們還支援了基於Hibert曲線的排序,Z-Order排序存在一個缺陷就是它會存在跨度較大的連線線,這樣會導致在檔案切割時,如果大跨度連線線被包含在某個檔案裡時,會導致這個檔案的Min,Max跨度很大,data skippingde效果就大打折扣。Hibert曲線就不存在連線線跨度非常大的問題,效果也就比Z-Order更好,如下圖所示。

我們支援了Z-Order排序組織之後,再加上MinMax索引,它們在Star Schema Benchmark(SSB)的效果如下圖所示。可以發現檔案讀取數量和查詢速度都有非常大的提升。但是Z-Order排序欄位越多,排序效果也會越差,因此我們建議2-4個。如果不進行資料的組織排序,MinMax索引的過濾效果就會非常有限。

--

04 索引的增強

針對Iceberg,我們引入了基於檔案級別的BloomFilter索引的支援,每個表的每個欄位可以建立BloomFilter過濾器。針對SSB,我們增加了兩個額外測試,一個是等值的資料查詢,另一個則是範圍過濾的資料查詢。如圖所示,加了BloomFilter後的等值資料查詢,讀取的檔案數量大大減少,查詢速度也有很大的提升。但是對於範圍查詢,BloomFilter這種索引並不支援根據範圍過濾條件過濾資料檔案。

因此,我們引入了BitMap索引的支援。

如下圖的totalprice欄位,我們引入BitMap索引,過濾條件為totoalprice小於19時,可以把2和18的BitMap進行一次或運算的操作,進而判斷操作結果是否大於零。大於零就說明這個檔案裡包含所需要的查詢資料,需要讀取這個資料。

BitMap還有一個額外的好處在於:當過濾條件有兩個的時候,我們需要查詢totalprice小於19並且city是「United ST005」的資料。對於這兩個查詢,單獨查詢需要的資料可能在某個檔案裡都存在,但是當我們這對這兩個條件做BitMap且運算後會發現某個檔案裡同時滿足這兩個條件的資料並不存在,因此我們可以不用讀取這個檔案。也就是說,BitMap的交併運算可以更好地在複雜過濾條件的情況下過濾掉更多的資料檔案。


但是BitMap有兩個比較重要的、功能實現上需要解決的問題。

其一,進行範圍過濾時,比如需要查詢price小於51的,需要把2到50的這些資料進行編碼和計算,大量的讀寫和計算會非常影響查詢效率。其二,針對每一個基數,都需要儲存對應的BitMap,儲存的代價比較大,尤其是高基數的欄位。

針對第一個問題,我們引入了Range Encoded的BitMap去解決問題。舉例來說,針對price欄位數值為18的值,我們儲存的不是它的BitMap,而是儲存了它與數值為2的BitMap的或運算結果。簡單說來,如果BitMap欄位裡有一個值是1,那麼其後面的所有值都是1。通過這種編碼的BitMap,我們就可以保證對於範圍查詢而言,我們都可以優化成只需要最多兩個BitMap的值就能取出任意條件範圍內的值。

為了解決第二個問題,我們引入了Bit-Slice Encoded的BitMap索引。

在下圖的例子中,我們看到price的取值有2,18,20,33等具體值以及相應的BitMap,對這些具體值進行按位元切分,圖中的「Comp 0」代表第零位。比如對於第零位為8的值,有18和188,那麼就對這兩個值進行BitMap的或運算並且將其結果儲存到Comp 0裡值為8的對應Bit Map裡。對於其它的位,也是以同樣的方式進行儲存。這樣之下,如果我們之前有256個基數,經過Bit-SLice Encoded之後,我們就只需要30個BitMap了,而不是之前的256個BitMap。

我們還可以把Bit-Slice Encoded的BitMap從十進位制進化成二進位制的位數表示,此處不做詳細介紹。

總的來說,我們可以把Bit-Slice Encoded和Range Encoded的BitMap進行結合,對於二進位制的Bit-Slice Range Encoded的BitMap,可以把256個基數的BitMap轉化成只需要9個BitMap的結果。

基於以上兩點,BitMap很好的解決了沒有排序的資料組織中的高基數的範圍查詢的問題。它的SSB結果中顯示查詢效率有1-10倍的提升,讀取檔案數量則有0-400倍的減少。也就是說,不僅是查詢效能的提升,也能使計算引擎的負載有很大程度的減少,硬體資源可以有更多的儲存。

以上這些工作,不僅是在Iceberg這個專案以及相關Spark專案上做了對應的擴充套件和改造去實現的,也有SQL層面上的介面擴充套件。比如支援新的API,包括在Iceberg裡,對Spark 3進行的語法擴充套件,通過distributed by支援檔案間的排序,排序方式有雜湊, Range, Z-Order以及Hibert曲線。Locally ordered by則是支援檔案內的排序。也就是說使用者可以自定義檔案間和檔案內的排序方式。後續的就是Magnus通過optimizer具體做相應資料優化任務。我們支援上文提到過的BitMap和BloomFilter檔案級別的索引,通過Iceberg的Actions可以拉起相應Spark任務,去做對應的寫索引和刪除索引的操作。

通過對於資料的排序的組織,以及索引的支援,我們也總結了在多維分析的資料場景下的設定策略,如下圖所示。這些策略使得我們能夠支援任意多欄位,任意過濾型別,在絕大部分多維分析場景下,只存取儘量少的檔案,加速查詢。


今天的分享就到這裡,謝謝大家。
本文首發於微信公眾號「DataFunTalk」。