取代Python多進程!高效能分佈式執行框架 - Berkeley Ray

2020-08-09 10:22:14

在这里插入图片描述

前言

隨着機器學習演算法和技術的進步,出現了越來越多需要在多臺機器並行計算的機器學習應用。然而,在叢集計算裝置上執行的機器學習演算法目前仍是專門設計的。儘管對於特定的用例而言(如參數伺服器或超參數搜尋),這些解決方案的效果很好,同時 AI 領域之外也存在一些高品質的分佈式系統(如 Hadoop 和 Spark),但前沿開發者們仍然常常需要從頭構建自己的系統,這意味着需要耗費大量時間和精力。

例如,應用一個簡單概唸的演算法,如在強化學習中的進化策略(論文《Evolution Strategies as a Scalable Alternative to Reinforcement Learning》)。演算法包含數十行虛擬碼,其中的 Python 實現也並不多。然而,在較大的機器或叢集上執行它需要更多的軟體工程工作。作者的實現包含了上千行程式碼,以及必須定義的通訊協定、資訊序列化、反序列化策略,以及各種數據處理策略。

Ray 的目標之一在於:讓開發者可以用一個執行在筆記型電腦上的原型演算法,僅需新增數行程式碼就能輕鬆轉爲適合於計算機叢集執行的(或單個多核心計算機的)高效能分佈式應用。這樣的框架需要包含手動優化系統的效能優勢,同時又不需要使用者關心那些排程、數據傳輸和硬體錯誤等問題。

本文對Ray進行介紹,以幫助大家更快地瞭解Ray是什麼,並且與Native Python進行對比。如有描述不當的地方,歡迎不吝指正。


一、Hello, Ray

1. 簡介

正如前言部分所述,

Ray是UC Berkeley RISELab新推出的高效能分佈式執行框架,它使用了和傳統分佈式計算系統不一樣的架構和對分佈式計算的抽象方式,具有比Spark更優異的計算效能。Ray是一個基於Python的分佈式執行引擎。相同的程式碼可以在單個機器上執行以實現高效的多處理,並且可以在羣集上用於大量的計算。

在这里插入图片描述

讓我們最後再來看一下分佈式執行架構Ray的高效能。如果想看Python的對比,可直接跳過這部分。

2. 效能表現

2.1 可延伸性和表現效能

  • 端到端可延伸性。 GCS 的主要優勢是增強系統的橫向可延伸性。我們可以觀察到幾乎線性的任務吞吐量增長。在 60 節點,Ray 可以達到超過每秒 100 萬個任務的吞吐量,併線性地在 100 個節點上超過每秒 180 萬個任務。最右邊的數據點顯示,Ray 可以在不到一分鐘的時間處理 1 億個任務(54s)。

在这里插入图片描述

  • 全域性排程器的主要職責是在整個系統中保持負載平衡。 Driver 在第一個節點提交了 100K 任務,由全域性排程器平衡分配給 21 個可用節點。

8.png

  • 物件儲存效能。 對於大物件,單一用戶端吞吐量超過了 15GB/s(紅色),對於小物件,物件儲存 IOPS 達到 18K(青色),每次操作時間約 56 微秒。

9.png

2.2 容錯性

  • 從物件失敗中恢復。 隨着 worker 節點被終結,活躍的區域性排程器會自動觸發丟失物件重建。在重建期間,driver 最初提交的任務被擱置,因爲它們的依賴關係不能滿足。但是整體的任務吞吐量保持穩定,完全利用可用資源,直到丟失的依賴項被重建。

10.png

  • 分佈式任務的完全透明容錯。 虛線表示叢集中的節點數。曲線顯示新任務(青色)和重新執行任務(紅色)的吞吐量,到 210s 時,越來越多的節點加回到系統,Ray 可以完全恢復到初始的任務吞吐量。

  • 從 actor 失敗中恢復。 通過將每個 actor 的方法呼叫編碼到依賴關係圖中,我們可以重用同一物件重構機制 機製。

11.png

t=200s 時,我們停止 10 個節點中的 2 個,導致叢集中 2000 個 actor 中的 400 個需要在剩餘節點上恢復。(a)顯示的是沒有中間節點狀態被儲存的極端情況。呼叫丟失的 actor 的方法必須重新序列執行(t = 210-330s)。丟失的角色將自動分佈在可用節點上,吞吐量在重建後完全恢復。(b)顯示的是同樣工作負載下,每 10 次方法呼叫每個 actor 自動進行了一次 checkpoint 儲存。節點失效後,大部分重建是通過執行 checkpoint 任務重建 actor 的狀態(t = 210-270s)。

  • GCS 複製消耗。 爲了使 GCS 容錯,我們複製每個數據庫碎片。當用戶端寫入 GCS 的一個碎片時,它將寫入複製到所有副本。通過減少 GCS 的碎片數量,我們人爲地使 GCS 成爲工作負載的瓶頸,雙向複製的開銷小於 10%。

2.3 RL 應用

我們用 Ray 實現了兩種 RL 演算法,與專爲這兩種演算法設計的系統進行對比,Ray 可以趕上甚至超越特定的系統。除此之外,使用 Ray 在叢集上分佈這些演算法只需要在演算法實現中修改很少幾行程式碼。

  • ES 演算法(Evolution Strategies)

12.png

Ray 和參考系統實現 ES 演算法在 Humanoid v1 任務上達到 6000 分所需時間對比。

在 Ray 上實現的 ES 演算法可以很好地擴充套件到 8192 核,而特製的系統在 1024 核後便無法執行。在 8192 核上,我們取得了中值爲 3.7 分鐘的效果,比目前最好效果快兩倍。

  • PPO 演算法(Proximal Policy Optimization)

爲了評估 Ray 在單一節點和更小 RL 工作負載的效能,我們在 Ray 上實現了 PPO 演算法,與 OpenMPI 實現的演算法進行對比。

13.png

MPI 和 Ray 實現 PPO 演算法在 Humanoid v1 任務上達到 6000 分所需時間對比。

用 Ray 實現的 PPO 演算法超越了特殊的 MPI 實現,並且使用 GPU 更少。


二、WHY: 爲何有這麼高的效能

博主青藤木鳥 Muniao’s blog在試用之後,簡單總結一下:

  1. 極簡 Python API 介面:在函數或者類定義時加上 ray.remote 的裝飾器並做一些微小改變,就能將單機程式碼變爲分佈式程式碼。這意味着不僅可以遠端執行純函數,還可以遠端註冊一個類(Actor模型),在其中維護大量context(成員變數),並遠端呼叫其成員方法來改變這些上下文。
  2. 高效數據儲存和傳輸:每個節點上通過共用記憶體(多進程存取無需拷貝)維護了一塊區域性的物件儲存,然後利用專門優化過的 Apache Arrow格式來進行不同節點間的數據交換。
  3. 動態圖計算模型:這一點得益於前兩點,將遠端呼叫返回的 future 控制代碼傳給其他的遠端函數或者角色方法,即通過遠端函數的巢狀呼叫構建複雜的計算拓撲,並基於物件儲存的發佈訂閱模式來進行動態觸發執行。
  4. 全域性狀態維護:將全域性的控制狀態(而非數據)利用 Redis 分片來維護,使得其他元件可以方便的進行平滑擴充套件和錯誤恢復。當然,每個 redis 分片通過 chain-replica 來避免單點。
  5. 去中心化的排程:排程器分散在各個節點上;根據 GCS 拉取全域性負載狀態資訊,然後隨機選擇一個合乎資源約束的可用節點。

這部分來自作者:青藤木鳥 Muniao’s blog 轉載請註明出處

當然,還有一些需要優化的地方,比如 Job 級別的封裝(以進行多租戶資源配給),待優化的垃圾回收演算法(針對物件儲存,現在只是粗暴的 LRU),多語言支援(最近支援了Java,但不知道好不好用)等等。但是瑕不掩瑜,其架構設計和實現思路還是有很多可以借鑑的地方。

1. 語言和計算模型

Ray 實現了動態任務圖計算模型,即:Ray 將應用建模爲一個在執行過程中動態生成依賴的任務圖。在此模型之上,Ray 提供了角色模型(Actor)並行任務模型(task-parallel) 的程式設計範式。Ray 對混合計算範式的支援使其有別於與像 CIEL 一樣只提供並行任務抽象和像 Orleans 或 Akka 一樣只提供角色模型抽象的系統。

1.1 程式設計模型

  • 任務模型(Tasks)

一個任務表示一個在無狀態工作進程執行的遠端函數(remote function)。當一個遠端函數被呼叫的時候,表示任務結果的 future 會立即被返回(也就是說所有的遠端函數呼叫都是非同步的,呼叫後會立即返回一個任務控制代碼)。可以將 Futures傳給 ray.get() 以阻塞的方式獲取結果,也可以將 Futures 作爲參數傳給其他遠端函數,以非阻塞、事件觸發的方式進行執行(後者是構造動態拓撲圖的精髓)。Futures 的這兩個特性讓使用者在構造並行任務的同時指定其依賴關係。下表是 Ray 的所有 API(相當簡潔而強大,但是實現起來會有很多坑,畢竟所有裝飾有 ray.remote 的函數或者類及其上下文都要序列化後傳給遠端節點,序列化用的和 PySpark 一樣的 cloudpickle)。

在这里插入图片描述

表1 Ray API

遠端函數作用於不可變的物體上,並且應該是無狀態的並且沒有副作用的:這些函數的輸出僅取決於他們的輸入(純函數)。這意味着冪等性(idempotence),獲取結果出錯時只需要重新執行該函數即可,從而簡化容錯設計。

  • 角色模型(Actors)

一個角色物件代表一個有狀態的計算過程。每個角色物件暴露了一組可以被遠端呼叫,並且按呼叫順序依次執行的成員方法(即在同一個角色物件內是序列執行的,以保證角色狀態正確的進行更新)。一個角色方法的執行過程和普通任務一樣,也會在遠端(每個角色物件會對應一個遠端進程)執行並且立即返回一個 future;但不同的是,角色方法會執行在一個有狀態(stateful)的工作進程上。一個角色物件的控制代碼(handle)可以傳遞給其他角色物件或者遠端任務,從而使他們能夠在該角色物件上呼叫這些成員函數。

在这里插入图片描述

表2 任務模型 vs. 角色模型的對比

表2 比較了任務模型和角色模型在不同維度上的優劣。任務模型利用叢集節點的負載資訊和依賴數據的位置資訊來實現細粒度的負載均衡,即每個任務可以被排程到儲存了其所需參數物件的空閒節點上;並且不需要過多的額外開銷,因爲不需要設定檢查點和進行中間狀態的恢復。與之相比,角色模型提供了極高效的細粒度的更新支援,因爲這些更新作用在內部狀態(即角色成員變數所維護的上下文資訊)而非外部物件(比如遠端物件,需要先同步到本地)。後者通常來說需要進行序列化和反序列化(還需要進行網路傳輸,因此往往很費時間)。例如,角色模型可以用來實現參數伺服器(parameter servers)和基於GPU 的迭代式計算(如訓練)。此外,角色模型可以用來包裹第三方模擬器(simulators)或者其他難以序列化的物件(比如某些模型)。

爲了滿足異構性和可延伸性,我們從三個方面增強了 API 的設計。首先,爲了處理長短不一的併發任務,我們引入了 ray.wait() ,它可以等待前 k 個結果滿足了就返回;而不是像 ray.get() 一樣,必須等待所有結果都滿足後才返回。其次,爲了處理對不同資源緯度( resource-heterogeneous)需求的任務,我們讓使用者可以指定所需資源用量(例如裝飾器:ray.remote(gpu_nums=1)),從而讓排程系統可以高效的管理資源(即提供一種互動手段,讓排程系統在排程任務時相對不那麼盲目)。最後,爲了提靈活性,我們允許構造巢狀遠端函數(nested remote functions),意味着在一個遠端函數內可以呼叫另一個遠端函數。這對於獲得高擴充套件性是至關重要的,因爲它允許多個進程以分佈式的方式相互呼叫(這一點是很強大的,通過合理設計函數,可以使得可以並行部分都變成遠端函數,從而提高並行性)。

1.2 計算模型

Ray 採用的動態圖計算模型,在該模型中,當輸入可用(即任務依賴的所有輸入物件都被同步到了任務所在節點上)時,遠端函數和角色方法會自動被觸發執行。在這一小節,我們會詳細描述如何從一個使用者程式(圖3)來構建計算圖(圖4)。該程式使用了表1 的API 實現了圖2 的僞碼。

@ray.remote
def create_policy():
# Initialize the policy randomly. return policy
@ray.remote(num_gpus=1)
class Simulator(object):
  def __init__(self):
  # Initialize the environment. self.env = Environment()
    def rollout(self, policy, num_steps):
      observations = []
      observation = self.env.current_state()
      for _ in range(num_steps):
        action = policy(observation)
        observation = self.env.step(action)
        observations.append(observation)
      return observations

@ray.remote(num_gpus=2)
def update_policy(policy, *rollouts):
  # Update the policy.
  return policy

@ray.remote
def train_policy():
  # Create a policy.
  policy_id = create_policy.remote()
  # Create 10 actors.
  simulators = [Simulator.remote() for _ in range(10)] # Do 100 steps of training.
  for _ in range(100):
      # Perform one rollout on each actor.
      rollout_ids = [s.rollout.remote(policy_id)
                     for s in simulators]
      # Update the policy with the rollouts.
      policy_id =
          update_policy.remote(policy_id, *rollout_ids)
   return ray.get(policy_id)

圖3:在 Ray 中實現圖2邏輯的程式碼,注意裝飾器 @ray.remote 會將被註解的方法或類宣告爲遠端函數或者角色物件。呼叫遠端函數或者角色方法後會立即返回一個 future 控制代碼,該控制代碼可以被傳遞給隨後的遠端函數或者角色方法,以此來表達數據間的依賴關係。每個角色物件包含一個環境物件 self.env ,這個環境狀態爲所有角色方法所共用。

在不考慮角色物件的情況下,在一個計算圖中有兩種型別的點:數據物件(data objects)和遠端函數呼叫(或者說任務)。同樣,也有兩種型別的邊:數據邊(data edges)和控制邊(control edges)。數據邊表達了數據物件任務間的依賴關係。更確切來說,如果數據物件 D 是任務 T 的輸出,我們就會增加一條從 T 到 D 的邊。類似的,如果 D是 任務 T 的輸入,我們就會增加一條 D 到 T 的邊。控制邊表達了由於遠端函數巢狀呼叫所造成的計算依賴關係,即,如果任務 T1 呼叫任務 T2, 我們就會增加一條 T1 到 T2 的控制邊。

在計算圖中,角色方法呼叫也被表示成了節點。除了一個關鍵不同點外,他們和任務呼叫間的依賴關係基本一樣。爲了表達同一個角色物件上的連續方法呼叫所形成的狀態依賴關係,我們向計算圖新增第三種類型的邊:在同一個角色物件上,如果角色方法 Mj 緊接着 Mi 被呼叫,我們就會新增一條 Mi 到 Mj 的狀態邊(即 Mi 呼叫後會改變角色物件中的某些狀態,或者說成員變數;然後這些變化後的成員變數會作爲 Mj 呼叫的隱式輸入;由此,Mi 到 Mj 間形成了某種隱式依賴關係)。這樣一來,作用在同一角色物件上的所有方法呼叫會形成一條由狀態邊串起來的呼叫鏈(chain,見圖4)。這條呼叫鏈表達了同一角色物件上方法被呼叫的前後相繼的依賴關係。

在这里插入图片描述

圖3:該圖與圖4 train_policy.remote() 呼叫相對應。遠端函數呼叫和角色方法呼叫對應圖中的任務(tasks)。該圖中顯示了兩個角色物件A10和A20,每個角色物件的方法呼叫(被標記爲 A1i 和 A2i 的兩個任務)之間都有狀態邊(stateful edge)連線,表示這些呼叫間共用可變的角色狀態。從 train_policy 到被其呼叫的任務間有控制邊連線。爲了並行地訓練多種策略,我們可以呼叫 train_policy.remote()多次。

狀態邊讓我們將角色物件嵌入到無狀態的任務圖中,因爲他們表達出了共用狀態、前後相繼的兩個角色方法呼叫之間的隱式數據依賴關係。狀態邊的新增還可以讓我們維護譜系圖(lineage),如其他數據流系統一樣,我們也會跟蹤數據的譜系關係以在必要的時候進行數據的重建。通過顯式的將狀態邊引入數據譜系圖中,我們可以方便的對數據進行重建,不管這些數據是遠端函數產生的還是角色方法產生的(小節4.2.3中會詳細講)。

2. 架構

Ray是使用什麼樣的架構對分佈式計算做出如上抽象的呢?下圖給出了Ray的系統架構。(來自Ray論文,Click here

在这里插入图片描述

圖4 Ray的架構圖

作爲分佈式計算系統,Ray仍舊遵循了典型的Master-Slave的設計:Master負責全域性協調和狀態維護,Slave執行分佈式計算任務。不過和傳統的分佈式計算系統不同的是,Ray使用了混合任務排程的思路。在叢集部署模式下,Ray啓動了以下關鍵元件:

  • GlobalScheduler: Master上啓動了一個全域性排程器,用於接收本地排程器提交的任務,並將任務分發給合適的本地任務排程器執行。
  • RedisServer: Master上啓動了一到多個RedisServer用於儲存分佈式任務的狀態資訊(ControlState),包括物件機器的對映、任務描述、任務debug資訊等。
  • LocalScheduler: 每個Slave上啓動了一個本地排程器,用於提交任務到全域性排程器,以及分配任務給當前機器的Worker進程。
  • Worker: 每個Slave上可以啓動多個Worker進程執行分佈式任務,並將計算結果儲存到ObjectStore。
  • ObjectStore: 每個Slave上啓動了一個ObjectStore儲存只讀數據物件,Worker可以通過共用記憶體的方式存取這些物件數據,這樣可以有效地減少記憶體拷貝和物件序列化成本。ObjectStore底層由Apache Arrow實現。
  • Plasma(現在改名爲arrow):每個Slave上的ObjectStore都由一個名爲Plasma的物件管理器進行管理,它可以在Worker存取本地ObjectStore上不存在的遠端數據物件時,主動拉取其它Slave上的物件數據到當前機器。

需要說明的是,Ray的論文中提及,全域性排程器可以啓動一到多個,而目前Ray的實現文件裡討論的內容都是基於一個全域性排程器的情況。我猜測可能是Ray尚在建設中,一些機制 機製還未完善,後續讀者可以留意此處的細節變化。

Ray的任務也是通過類似Spark中Driver的概唸的方式進行提交的,有所不同的是:

  1. Spark的Driver提交的是任務DAG,一旦提交則不可更改。
  2. 而Ray提交的是更細粒度的remote function,任務DAG依賴關係由函數依賴關係自由定製。

論文給出的架構圖裏並未畫出Driver的概念,因此我在其基礎上做了一些修改和擴充。

在这里插入图片描述

圖5 Ray的任務

Ray的Driver節點和和Slave節點啓動的元件幾乎相同,不過卻有以下區別:

  1. Driver上的工作進程DriverProcess一般只有一個,即使用者啓動的PythonShell。Slave可以根據需要建立多個WorkerProcess。
  2. Driver只能提交任務,卻不能接收來自全域性排程器分配的任務。Slave可以提交任務,也可以接收全域性排程器分配的任務。
  3. Driver可以主動繞過全域性排程器給Slave發送Actor呼叫任務(此處設計是否合理尚不討論)。Slave只能接收全域性排程器分配的計算任務。

3. Ray 高階庫

  • Tune: Scalable Hyperparameter Tuning 可伸縮超參數調整
  • RLlib: Scalable Reinforcement Learning 可延伸的強化學習
  • RaySGD: Distributed Training Wrappers 分佈式培訓包裝
  • Ray Serve: Scalable and Programmable Serving 可延伸和可程式化服務

3.1 Tune

Tune是用於任何規模的超參數調整的庫。

  • 用不到10行程式碼啓動多節點分佈式超參數掃描。
  • 支援任何深度學習框架,包括PyTorch,PyTorch Lightning,TensorFlow和Keras。
  • 使用TensorBoard視覺化結果。
  • 在可延伸的SOTA演算法中進行選擇,例如基於人口的培訓(PBT),Vizier的中值停止規則,HyperBand / ASHA。
  • Tune與許多優化庫(例如Facebook Ax,HyperOpt和貝葉斯優化)整合在一起,可以透明地擴充套件它們。

下面 下麪的這個例子,執行並行網格搜尋以優化範例目標函數。要執行這個例子,先執行以下這條指令。

$ pip install ray[tune]
from ray import tune


def objective(step, alpha, beta):
    return (0.1 + alpha * step / 100)**(-1) + beta * 0.1


def training_function(config):
    # Hyperparameters
    alpha, beta = config["alpha"], config["beta"]
    for step in range(10):
        # Iterative training function - can be any arbitrary training procedure.
        intermediate_score = objective(step, alpha, beta)
        # Feed the score back back to Tune.
        tune.report(mean_loss=intermediate_score)

analysis = tune.run(
    training_function,
    config={
        "alpha": tune.grid_search([0.001, 0.01, 0.1]),
        "beta": tune.choice([1, 2, 3])
    })

print("Best config: ", analysis.get_best_config(metric="mean_loss"))

# Get a dataframe for analyzing trial results.
df = analysis.dataframe()

如果已安裝TensorBoard,則自動顯示所有試用結果:

tensorboard --logdir ~/ray_results

3.2 RLlib

RLlib是在Ray之上構建的用於增強學習的開源庫,它爲各種應用程式提供高可伸縮性和統一的API。

pip install tensorflow  # or tensorflow-gpu
pip install ray[rllib]  # also recommended: ray[debug]
import gym
from gym.spaces import Discrete, Box
from ray import tune

class SimpleCorridor(gym.Env):
    def __init__(self, config):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0
        self.action_space = Discrete(2)
        self.observation_space = Box(0.0, self.end_pos, shape=(1, ))

    def reset(self):
        self.cur_pos = 0
        return [self.cur_pos]

    def step(self, action):
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        elif action == 1:
            self.cur_pos += 1
        done = self.cur_pos >= self.end_pos
        return [self.cur_pos], 1 if done else 0, done, {}

tune.run(
    "PPO",
    config={
        "env": SimpleCorridor,
        "num_workers": 4,
        "env_config": {"corridor_length": 5}})

3.3 Ray Serve

Ray Serve是基於Ray構建的可伸縮模型服務庫, 它有以下特點:

  • 框架不可知(Framework Agnostic):使用相同的工具包即可提供服務,從使用PyTorch或Tensorflow&Keras等框架構建的深度學習模型到Scikit-Learn模型或任意業務邏輯。
  • Python優先(Python First):使用純Python程式碼設定服務的模型-不再需要YAML或JSON設定。
  • 面向效能(Performance Oriented):啓用批次處理,流水線處理和GPU加速,以提高模型的吞吐量。
  • 本機組合(Composition Native):允許您將多個模型組合在一起以建立單個預測,從而建立「模型管道」。
  • 水平可延伸(Horizontally Scalable):服務可以隨着您新增更多計算機而線性擴充套件。 使您的基於ML的服務能夠處理不斷增長的流量。

下面 下麪這個範例執行一個scikit-learn梯度提升分類器。在執行前需要執行以下語句:

$ pip install scikit-learn
$ pip install "ray[serve]"
from ray import serve
import pickle
import requests
from sklearn.datasets import load_iris
from sklearn.ensemble import GradientBoostingClassifier

# Train model
iris_dataset = load_iris()
model = GradientBoostingClassifier()
model.fit(iris_dataset["data"], iris_dataset["target"])

# Define Ray Serve model,
class BoostingModel:
    def __init__(self):
        self.model = model
        self.label_list = iris_dataset["target_names"].tolist()

    def __call__(self, flask_request):
        payload = flask_request.json["vector"]
        print("Worker: received flask request with data", payload)

        prediction = self.model.predict([payload])[0]
        human_name = self.label_list[prediction]
        return {"result": human_name}


# Deploy model
serve.init()
serve.create_backend("iris:v1", BoostingModel)
serve.create_endpoint("iris_classifier", backend="iris:v1", route="/iris")

# Query it!
sample_request_input = {"vector": [1.2, 1.0, 1.1, 0.9]}
response = requests.get("http://localhost:8000/iris", json=sample_request_input)
print(response.text)
# Result:
# {
#  "result": "versicolor"
# }

三、Have A TRY

Ray是一個基於Python的分佈式執行引擎。相同的程式碼可以在單個機器上執行以實現高效的多處理,並且可以在羣集上用於大量的計算。

使用Ray時,涉及以下幾個過程:

  • 多個工作進行執行任務,並將結果村儲存在物件庫中,每個進程是一個獨立的處理單位。
  • 每個節點的儲存不可變的物件在共用記憶體中,並允許進程在相同節點上高效複製和反序列化物件
  • 一個全域性排程器排程接收任務,並將它們分配到其他地方節點執行
  • 一個driver是使用者控制的python程式。例如,如果使用者正在執行指令碼或使用python shell,那麼driver就是執行的指令碼或者python進程。driver與工作程式類似,都可以將任務提交給本地排程程式,並從物件儲存中獲取物件,但不同之處在於本地排程程式不會講任務分配給要執行的driver
  • 一個Redis伺服器維護大量的系統狀態,例如,他跟蹤哪些物件在哪些機器上以及任務規範(而不是數據)上,他可以直接用於偵錯目的的查詢。

NOTE: As of Ray 0.8.1, Python 2 is no longer supported.

1. 簡單開始

並行執行Python函數。

import ray
ray.init()

@ray.remote
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))

要使用Ray的角色模型(Actors):

import ray
ray.init()

@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1

    def read(self):
        return self.n

counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures))

Ray程式可以在單台計算機上執行,也可以無縫擴充套件到大型羣集。 要在雲中執行上述Ray指令碼,只需下載這個檔案並執行:

ray submit [CLUSTER.YAML] example.py --start

2. 模擬器的虛構範例

僅用遠端函數和上述的任務所無法完成的一件事是在相同的共用可變狀態上執行多個任務。這在很多機器學習場景中都出現過,其中共用狀態可能是模擬器的狀態、神經網路的權重或其它。Ray 使用 actor 抽象以封裝多個任務之間共用的可變狀態。以下是關於 Atari 模擬器的虛構範例:

import gym
@ray.remote
class Simulator(object):
 def __init__(self):
     self.env = gym.make("Pong-v0")
     self.env.reset()
 def step(self, action):
     return self.env.step(action)
# Create a simulator, this will start a remote process that will run
# all methods for this actor.
simulator = Simulator.remote()
observations = []
for _ in range(4):
 # Take action 0 in the simulator. This call does not block and
 # it returns a future.
 observations.append(simulator.step.remote(0))

Actor 可以很靈活地應用。例如,actor 可以封裝模擬器或神經網路策略,並且可以用於分佈式訓練(作爲參數伺服器),或者在實際應用中提供策略。

在这里插入图片描述

圖6 左:actor 爲用戶端進程提供預測/操作。 右:多個參數伺服器 actor 使用多個工作進程執行分佈式訓練。

3. 參數伺服器範例

一個參數伺服器可以作爲一個 Ray actor 按如下程式碼實現:

@ray.remote
class ParameterServer(object):
 def __init__(self, keys, values):
     # These values will be mutated, so we must create a local copy.
     values = [value.copy() for value in values]
     self.parameters = dict(zip(keys, values))
 def get(self, keys):
     return [self.parameters[key] for key in keys]
 def update(self, keys, values):
     # This update function adds to the existing values, but the update
     # function can be defined arbitrarily.
     for key, value in zip(keys, values):
         self.parameters[key] += value

這裏有更完整的範例:http://ray.readthedocs.io/en/latest/example-parameter-server.html

執行以下程式碼初始化參數伺服器:

parameter_server = ParameterServer.remote(initial_keys, initial_values)

執行以下程式碼,建立 4 個長時間執行的持續恢復和更新參數的工作進程:

@ray.remote
def worker_task(parameter_server):
 while True:
     keys = ['key1', 'key2', 'key3']
     # Get the latest parameters.
     values = ray.get(parameter_server.get.remote(keys))
     # Compute some parameter updates.
     updates =# Update the parameters.
     parameter_server.update.remote(keys, updates)
# Start 4 long-running tasks.
for _ in range(4):
 worker_task.remote(parameter_server)

四、Ray V.S. Python

概念插播:不可變遠端物件


在Ray中,我們可以建立和計算物件。我們將這些物件稱爲遠端物件使用物件ID來參照它們。遠端物件儲存在物件儲存中,並且羣集中每個節點都有一個物件儲存。在叢集設定中,我們可能實際上並不知道每個物件所在的機器。一個物件ID本質上是一個唯一的ID可以被用來指代一個遠端物件。如果您對Futures熟悉,我們的物件ID在概念上是相似的。


我們假設遠端物件是不可變的。也就是說,它們的值在建立後不能改變。這允許遠端物件在多個物件儲存中被複制,而不需要同步副本。

4.1 Put 和 Get

命令ray.get和ray.put可用於Python物件之間進行轉換和物件ID,如示於以下的例子。

x  =  "example"
ray.put (x )  #ObjectID(b49a32d72057bdcfc4dda35584b3d838aad89f5d)

該命令ray.put(x)將由工作進程或驅動程式進程執行(驅動程式進程是執行指令碼的進程)。它需要一個Python物件,並將其複製到本地物件儲存區(這裏的本地手段在同一個節點上)。一旦物件被儲存在物件儲存中,其值就不能被改變。

另外,ray.put(x)返回一個物件ID,它本質上是一個可以用來參照新建立的遠端物件的ID。如果我們把物件ID儲存在一個變數中,那麼我們就可以傳入遠端函數,這些遠端函數將在相應的遠端物件上執行

ray.x_id = ray.put(x)

該命令ray.get(x_id)獲取一個物件ID,並從相應的遠端物件中建立一個Python物件。對於像陣列這樣的物件,我們可以使用共用記憶體,避免複製物件。對於其他物件,這將物件從物件儲存複製到工作進程的堆。如果與物件ID相對應的遠端物件x_id不是與呼叫的worker相同的節點上ray.get(x_id),則遠端物件將首先從具有該遠端物件的物件庫轉移到需要它的物件庫。

x_id  =  ray.get("example")
ray.get(x_id )  #「example」

如果與物件ID對應的遠端物件x_id尚未建立,則該命令ray.get(x_id)將等待,直到建立遠端物件。

一個非常常見的用例ray.get是獲取物件ID的列表。在這種情況下,你可以呼叫ray.get(object_ids), 其中object_ids的物件ID的列表。

result_ids  =  [ ray.put(i) for i in range(10)] 
ray.get(result_ids)[0123456789]

4.2 Ray 中的非同步計算

Ray允許任意Python函數非同步執行。這是通過將Python函數指定爲遠端函數來完成的。

例如,一個普通的Python函數看起來像這樣。

def  add1 (a , b )return  a  +  b

一個遠端函數看起來像這樣。

@ray.remote 
def  add2 (a , b )return  a  +  b

4.3 遠端功能

然而呼叫返回並導致Python直譯器阻塞,直到計算完成,呼叫 立即返回一個物件ID並建立一個任務。該任務將由系統排程並非同步執行(可能在不同的機器上)。當任務完成執行時,其返回值將被儲存在物件儲存中。

x_id  =  add2.remote(1 , 2)
ray.get(x_id )3

以下簡單範例演示瞭如何使用非同步任務來並行化計算。

import time

def  f1():
    time.sleep(1)

@ray.remote 
def f2():
    time.sleep(1)

#以下需要十秒。
[f1() for _ in range(10)]

#以下需要一秒(假設系統至少有10個CPU)。
ray.get([ f2.remote() for _ in range(10)])

提交任務和執行任務之間存在明顯的區別。當呼叫遠端函數時,執行該函數的任務將被提交給本地排程程式,並立即返回任務輸出的物件ID。但是,在系統實際上在工作人員上安排任務之前,任務不會被執行。任務執行不是懶惰地完成的。系統將輸入數據移動到任務中,一旦輸入相關性可用並且有足夠的資源進行計算,任務將立即執行。

提交任務時,每個參數可以通過值或物件ID傳入。例如,這些行具有相同的行爲。

add2.remote(1, 2)
add2.remote(1, ray.put(2))
add2.remote(ray.put(1), ray.put(2))

遠端函數永遠不會返回實際值,它們總是返回物件ID。

當遠端函數被實際執行時,它對Python物件進行操作。也就是說,如果使用任何物件ID呼叫遠端函數,系統將從物件儲存中檢索相應的物件。

請注意,遠端函數可以返回多個物件ID。

@ray.remote(num_return_vals=3)
def return_multiple():
    return 1, 2, 3

a_id, b_id, c_id = return_multiple.remote()

4.4 表達任務之間的依賴關係

程式設計師可以通過將一個任務的物件ID輸出作爲參數傳遞給另一個任務來表達任務之間的依賴關係。例如,我們可以啓動三個任務,每個任務都依賴於前一個任務。

@ray.remote
def f(x):
    return x + 1

x = f.remote(0)
y = f.remote(x)
z = f.remote(y)
ray.get(z) # 3

上面的第二個任務將不會執行,直到第一個任務完成,第三個任務將不會執行直到第二個任務完成。在這個例子中,沒有並行的機會。

編寫任務的能力可以很容易地表達有趣的依賴關係。考慮下面 下麪的一個樹減少的實現。

import numpy as np

@ray.remote
def generate_data():
    return np.random.normal(size=1000)

@ray.remote
def aggregate_data(x, y):
    return x + y

# Generate some random data. This launches 100 tasks that will be scheduled on
# various nodes. The resulting data will be distributed around the cluster.
data = [generate_data.remote() for _ in range(100)]

# Perform a tree reduce.
while len(data) > 1:
    data.append(aggregate_data.remote(data.pop(0), data.pop(0)))

# Fetch the result.
ray.get(data)

4.5 有效地對值進行聚合

我們可以以更復雜的方式使用任務依賴。例如,假設我們希望將8個值聚合在一起。在我們的範例中,我們將進行整數加法,但在很多應用程式中,跨多臺計算機聚合大型向量可能會造成效能瓶頸。在這個時候,只要修改一行程式碼就可以將聚合的執行時間從線性降爲對數級別,即聚合值的數量。

在这里插入图片描述

圖7 左側的依賴圖深度爲7,右側的依賴圖深度爲3。計算產生相同的結果,但右側的依賴圖執行得更快。

如上所述,要將一個任務的輸出作爲輸入提供給後續任務,只需將第一個任務返回的future作爲參數傳給第二個任務。Ray的排程程式會自動考慮任務依賴關係。在第一個任務完成之前不會執行第二個任務,第一個任務的輸出將自動被髮送給執行第二個任務的機器。

import time
 
@ray.remote
def add(x, y):
 time.sleep(1)
 return x + y
 
# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)
 
# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)

code1 以線性方式聚合值與以樹形結構方式聚合值的對比

上面的程式碼非常清晰,但請注意,這兩種方法都可以使用while回圈來實現,這種方式更爲簡潔。

# Slow approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
 values = [add.remote(values[0], values[1])] + values[2:]
result = ray.get(values[0])
 
# Fast approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
 values = values[2:] + [add.remote(values[0], values[1])]
result = ray.get(values[0])

更簡潔的聚合實現方案。兩個程式碼塊之間的唯一區別是「add.remote」的輸出是放在列表的前面還是後面。

4.6 從類到actor

在不使用類的情況下開發有趣的應用程式很具挑戰性,在分佈式環境中也是如此。

你可以使用@ray.remote裝飾器宣告一個Python類。在範例化類時,Ray會建立一個新的「actor」,這是一個執行在叢集中並持有類物件副本的進程。對這個actor的方法呼叫轉變爲在actor進程上執行的任務,並且可以存取和改變actor的狀態。通過這種方式,可以在多個任務之間共用可變狀態,這是遠端函數無法做到的。

各個actor按順序執行方法(每個方法都是原子方法),因此不存在競態條件。可以通過建立多個actor來實現並行性。

@ray.remote
class Counter(object):
 def __init__(self):
 self.x = 0
 
 def inc(self):
 self.x += 1
 
 def get_value(self):
 return self.x
 
# Create an actor process.
c = Counter.remote()
 
# Check the actor's counter value.
print(ray.get(c.get_value.remote())) # 0
 
# Increment the counter twice and check the value again.
c.inc.remote()
c.inc.remote()
print(ray.get(c.get_value.remote())) # 2

code2 將Python類範例化爲actor

上面的例子是actor最簡單的用法。Counter.remote()建立一個新的actor進程,它持有一個Counter物件副本。對c.get_value.remote()和c.inc.remote()的呼叫會在遠端actor進程上執行任務並改變actor的狀態。

4.7 actor控制代碼

在上面的範例中,我們只在主Python指令碼中呼叫actor的方法。actor的一個最強大的地方在於我們可以將控制代碼傳給它,讓其他actor或其他任務都呼叫同一actor的方法。

以下範例建立了一個可以儲存訊息的actor。幾個worker任務反覆 反復將訊息推播給actor,主Python指令碼定期讀取訊息。

import time
 
@ray.remote
class MessageActor(object):
 def __init__(self):
 self.messages = []
 
 def add_message(self, message):
 self.messages.append(message)
 
 def get_and_clear_messages(self):
 messages = self.messages
 self.messages = []
 return messages
 
# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
 for i in range(100):
 time.sleep(1)
 message_actor.add_message.remote(
 "Message {} from actor {}.".format(i, j))
 
# Create a message actor.
message_actor = MessageActor.remote()
 
# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]
 
# Periodically get the messages and print them.
for _ in range(100):
 new_messages = ray.get(message_actor.get_and_clear_messages.remote())
 print("New messages:", new_messages)
 time.sleep(1)
 
# This script prints something like the following:
# New messages: []
# New messages: ['Message 0 from actor 1.', 'Message 0 from actor 0.']
# New messages: ['Message 0 from actor 2.', 'Message 1 from actor 1.', 'Message 1 from actor 0.', 'Message 1 from actor 2.']
# New messages: ['Message 2 from actor 1.', 'Message 2 from actor 0.', 'Message 2 from actor 2.']
# New messages: ['Message 3 from actor 2.', 'Message 3 from actor 1.', 'Message 3 from actor 0.']
# New messages: ['Message 4 from actor 2.', 'Message 4 from actor 0.', 'Message 4 from actor 1.']
# New messages: ['Message 5 from actor 2.', 'Message 5 from actor 0.', 'Message 5 from actor 1.']

code3 在多個併發任務中呼叫actor的方法

actor非常強大。你可以通過它將Python類範例化爲微服務,可以從其他actor和任務(甚至其他應用程式中)查詢這個微服務。

任務和actor是Ray提供的核心抽象。這兩個概念非常通用,可用於實現複雜的應用程式,包括用於強化學習、超參數調整、加速Pandas等Ray內建庫。



參考資料:

[1] Ray. https://rise.cs.berkeley.edu/projects/ray/

[2] Ray paper. https://www.usenix.org/system/files/osdi18-moritz.pdf

[3] GitHub. https://github.com/ray-project/ray

[4] Documents. https://docs.ray.io/en/latest/index.html

[5] Blog: https://ray-project.github.io

[6] Ray: A Distributed System for AI. Robert Nishihara and Philipp Moritz. Jan 9, 2018 https://bair.berkeley.edu/blog/2018/01/09/ray/

[7] 高效能分佈式執行框架——Ray https://www.cnblogs.com/fanzhidongyzby/p/7901139.html

[8] 取代 Python 多進程!伯克利開源分佈式框架 Ray https://blog.csdn.net/weixin_33918114/article/details/89122027

[9] UC Berkeley提出新型分佈式執行框架Ray:有望取代Spark https://blog.csdn.net/weixin_34101784/article/details/87945299

[10] 基於python的高效能實時並行機器學習框架之Ray介紹 https://blog.csdn.net/lck5602/article/details/78665520

[11] UC Berkeley提出新型分佈式框架Ray:實時動態學習的開端 https://blog.csdn.net/uwr44uouqcnsuqb60zk2/article/details/78869192

[12] 繼 Spark 之後,UC Berkeley 推出新一代 AI 計算引擎 ——Ray https://www.qtmuniao.com/2019/04/06/ray/