本文是閱讀 LinkedIn 公司2020年發表的論文 Magnet: Push-based Shuffle Service for Large-scale Data Processing 一點筆記。
以上圖為例,在一個DAG的執行圖中,節點與節點之間的資料交換就是Shuffle的過程。雖然Shuffle的過程很簡單,但是不同的引擎有不同的實現。
以shuffle資料傳輸的媒介來看
而基於本地磁碟的Shuffle實現中又有很多種不同的實現
以部署方式來看
co-locate
_也許_不再那麼重要。
這裡再大致介紹下spark原生的external sort shuffle的詳細流程
在LinkedIn公司主要採用了Spark自帶的基於Yarn的External sorted shuffle實現,主要遇到痛點:
map 和 reduce task之間需要維護all-to-all 的連結,以M個Map端task,R和Reducer端task為例,理論上就會建立M * R 個connection。
在實際實現中,一個executor上的reducer可以共用一個和ess的tcp連結。因此實際上的連結數是和executor個數 E 和ess節點數 S相關。但是在生產叢集中 E 和 S 可能都會達到上千,這時連結數就會非常的客觀,很容易帶來穩定性的問題,如果建立連結失敗可能會導致相關stage進行重跑,失敗代價很高。
從上面的讀取流程我們可以看到因為多個reduce task資料在同一個檔案中,很容易產生隨機讀取的問題,並且從linkedin公司觀察到的這些block通常都比較小,平均只有10KB。而LinkedIn shuffle叢集主要使用的HDD磁碟,這個問題就會更大。並且隨機讀取以及大量的網路小包會帶來效能的損失。
也許我們會想到說是否可以有辦法來通過調參來讓Shuffle Block 變大而減輕隨機小IO的問題呢?比如把reduce task端的並行調小,這樣每個task的資料量必然就變大了。
論文中也對此做了闡述,沒法通過簡單的調整reduce task的並行來增大shuffle block size的大小。
假設有一個M個mapper,R個reducer的任務,總的shuffle資料量為D。為了保持每個task處理的資料量恆定,當總資料量增長的時候,map和reduce的並行都要等比增長。
而shuffle block 大小就是 , 為什麼 是 呢,從上面的流程中可以看到每個map端可以近似看做是維護了R個reduce的block。所以總的block數是 。
那麼當資料量增長時,並且為了保證每個task處理的資料量恆定,即效能不下降,那麼shuffle block size必然會減小。最後也因為reduce端資料分散在所有的map端的task,導致不太能利用data locality的特性。
總體架構
Mapper 端的shuffle資料會push到遠端的 shuffle service,並按照reduce端合併成一個檔案。這樣shuffle 檔案的大小就可以提高到MB級別。
這裡Magnet主要考慮儘可能避免給shuffle service帶來過大的壓力(為了穩定性和可延伸性考慮),因此在Magent中,在mapper端,依然會將shuffle資料,首先儲存到本地,然後再按照以下的演演算法,將shuffle blocks打包成一個個chunks傳送到shuffle service。
計算blocks劃分到chunks演演算法
這個演演算法的含義如下:
k++
演演算法最終輸出的是每個 shuffle service 機器和對應的所需要接收的chunk的集合。
這個演演算法保證,每個chunks只包含一個shuffle file中連續的不同shuffle partition 的 shuffle blocks。當達到一定大小後會另外建立一個chunk。但是不同mapper上的同一個shuffle parititon的資料最終會路由到同一個shuffle service節點上。
並且為了避免同時mapper端都按照同一順序往shuffle service 節點寫資料造成擠兌和merge時的檔案並行鎖,所以在mapper端處理chunk的順序上做了隨機化。
在完成打包chunk和隨機化之後,就交由一個專門的執行緒池來將資料從按照chunk順序從本地磁碟load出來,所以這裡就是順序的讀取本地磁碟再push到遠端的shuffle service。Push操作是和Mapper端的task解耦的,push操作失敗不會影響map端的task。
當magnet收到打包傳送來的chunks,首先會根據block的後設資料獲取他的分割區資訊,然後根據shuffle service本地維護的後設資料做處理,shuffle service本地為每個Shuffle partition (reduce partition)維護了以下元資訊
這樣首先可以根據傳送來的shuffle blocks的後設資料判斷資料是否已經merge過了,避免重複儲存。通過currentMapId來避免多個mapper端資料同時往一個檔案merge的問題,而position offset 則可以用作在merge 失敗的時候可以依舊保持檔案能讀到最近一次成功的位置。下一次重寫的時候會依舊從position offset進行覆蓋寫入。通過這幾個後設資料管理,就可以很優雅的處理在檔案merge過程中的寫重複,寫衝突和寫失敗的問題。
在Magent的設計中,push/merge的失敗,並不會影響整個任務的流程,可以fallback到讀取mapper端未merge的資料。
我理解要實現這樣的目的,原始資料就需要被保留,所以可以看到在架構圖中Magent Shuffle Service實際上會和executor一起部署(還支援其他的部署形式)。在executor端作為external shuffle service的角色存在,mapper端的資料產出完之後就由原生的shuffle service 節點託管了。所以他可以在以上2、3兩種失敗場景下提供fallback的讀取能力。
同時資料是否Merge完的資訊是在Spark Driver中通過MapStatus
和 MergeStatus
兩個結構來進行維護的,下游讀取資料時就是由driver來進行是否fallback的邏輯。
從整體上看Push/Merge 的操作可以理解為完全由Magent Shuffle Service節點託管的資料搬遷合併的動作(將各個mapper處的資料搬遷合併成redcuer端的資料),通過資料寫兩次的行為使得mapper端寫資料和合並解耦,並且在fault tolerance的設計中也利用了寫兩次這個行為所帶來的備份的好處。
同時我們需要關注到雖然通過這個操作,將mapper端的隨機讀取轉化成了順序讀取,但是在shuffle service時merge時,其實還是random write,這在資料重組的過程中是必然的。但是由於os cache 和 disk buffer的存在,會使得random write的吞吐比random read的吞吐大很多。
Magnet支援兩種模式的部署
在on-perm的叢集中,Spark driver可以很好的利用data locality的特性,在push/merge節點結束後,可以將reduce task儘可能排程到資料所在的節點上,可以直接讀取本地資料,效率更高,減少了網路的傳輸也不容易失敗。
因為Spark計算引擎是BSP模型,所以在map端階段全部完成之前reduce端不會開始計算,因此在Push/Megre階段,為了防止部分Push/Merge較慢影響下游reduce task開始執行。Magnet支援了最大的超時機制,利用上面提到的fallback行為,在超時之後就標記該map端的分割區為unmerged,這樣就跳過了這部分慢節點,直接開始reduce階段。
而針對資料傾斜場景,為了避免reduce端合併的檔案過大,這時Magent的解法是和Spark的Adaptive execution 相結合,根據執行時採集到的每個block的大小,當block 大於某個閾值時,就在合併chunk的階段跳過這種block,還是通過fallback行為直接讀取原來mapper端較大的資料塊
在Hadoop的Map-Reduce模型中,通過 "Slow start" 技術可以在Map task都完成之前,部分Reduce task可以先開始進行資料預拉,實現了比較有限的並行化
而在Spark中,通過資料拉取和資料處理的執行緒解耦,這兩者有點類似於一組生產者和消費者。
而在Magnet中也採用了類似的技術,在mapper端Push task 和 mapper task解耦,但是這裡不太理解這個mapper端解耦的收益,因為本身就是在mapper task結束之後才開始進行push task,也就不存在計算執行緒和io執行緒並行的說法。可以理解的是可以通過這個方式和mapper task的框架執行緒解耦。
然後在reduce端,為了最大化並行讀取的能力,不會將reduce端的資料只合併成一個檔案,而是切成多個MB大小的slice,然後reduce task可以發起並行讀取的請求最大化的提高吞吐。
從上面可以看出Magent的幾個設計宗旨
很多系統設計最後對於系統的測試設計其實也很有看點。在論文裡提到了Magent採用了模擬和生產叢集兩個模式來最終衡量新的Shuffle Service的效果。
Magnet 開發了一個分散式的壓測框架,主要可以模擬以下幾個維度
並且可以模擬fetch和push的請求
那衡量的指標有哪些
其他的指標資料就不一一列舉了,可以檢視原文相關章節獲取
最後上線後的優化效果
Figure 1: Shuffle locality ratio increase over past 6 months
https://mp.weixin.qq.com/s/8Fhn24vbZdt6zmCZRvhdOg Magent shuffle 解讀
https://zhuanlan.zhihu.com/p/397391514 Magnet shuffle解讀
https://zhuanlan.zhihu.com/p/67061627 spark shuffle 發展
https://mp.weixin.qq.com/s/2yT4QGIc7XTI62RhpYEGjw
https://mp.weixin.qq.com/s/2yT4QGIc7XTI62RhpYEGjw
https://www.databricks.com/session_na21/magnet-shuffle-service-push-based-shuffle-at-linkedin
https://issues.apache.org/jira/browse/SPARK-30602
https://www.linkedin.com/pulse/bringing-next-gen-shuffle-architecture-data-linkedin-scale-min-shen
本文來自部落格園,作者:Aitozi,轉載請註明原文連結:https://www.cnblogs.com/Aitozi/p/16813183.html