使用Pytorch進行多卡訓練

2022-10-12 15:00:08

  當一塊GPU不夠用時,我們就需要使用多卡進行並行訓練。其中多卡並行可分為資料並行和模型並行。具體區別如下圖所示:

  由於模型並行比較少用,這裡只對資料並行進行記錄。對於pytorch,有兩種方式可以進行資料並行:資料並行(DataParallel, DP)和分散式資料並行(DistributedDataParallel, DDP)。

  在多卡訓練的實現上,DP與DDP的思路是相似的:

  1、每張卡都複製一個有相同引數的模型副本。

  2、每次迭代,每張卡分別輸入不同批次資料,分別計算梯度。

  3、DP與DDP的主要不同在於接下來的多卡通訊:

  DP的多卡互動實現在一個程序之中,它將一張卡視為主卡,維護單獨模型優化器。所有卡計算完梯度後,主卡匯聚其它卡的梯度進行平均並用優化器更新模型引數,再將模型引數更新至其它卡上。

  DDP則分別為每張卡建立一個程序,每個程序相應的卡上都獨立維護模型和優化器。在每次每張卡計算完梯度之後,程序之間以NCLL(NVIDIA GPU通訊)為通訊後端,使各卡獲取其它卡的梯度。各卡對獲取的梯度進行平均,然後執行後續的引數更新。由於每張卡上的模型與優化器引數在初始化時就保持一致,而每次迭代的平均梯度也保持一致,那麼即使沒有進行引數複製,所有卡的模型引數也是保持一致的。

  Pytorch官方推薦我們使用DDP。DP經過我的實驗,兩塊GPU甚至比一塊還慢。當然不同模型可能有不同的結果。下面分別對DP和DDP進行記錄。

DP

   Pytorch的DP實現多GPU訓練十分簡單,只需在單GPU的基礎上加一行程式碼即可。以下是一個DEMO的程式碼。

import torch
from torch import nn
from torch.optim import Adam
from torch.nn.parallel import DataParallel

class DEMO_model(nn.Module):
  def __init__(self, in_size, out_size):
    super().__init__()
    self.fc = nn.Linear(in_size, out_size)
  def forward(self, inp):
    outp = self.fc(inp)
    print(inp.shape, outp.device)
    return outp

model = DEMO_model(10, 5).to('cuda')
model = DataParallel(model, device_ids=[0, 1]) # 額外加這一行
adam = Adam(model.parameters())

# 進行訓練
for i in range(1):
  x = torch.rand([128, 10]) # 獲取訓練資料,無需指定裝置
  y = model(x) # 自動均勻劃分資料批次並分配至各GPU,輸出結果y會聚集到GPU0中
  loss = torch.norm(y)
  loss.backward()
  adam.step()

  其中model = DataParallel(model, device_ids=[0, 1])這行將模型複製到0,1號GPU上。輸入資料x無需指定裝置,它將會被均勻分配至各塊GPU模型,進行前向傳播。之後各塊GPU的輸出再合併到GPU0中,得到輸出y。輸出y在GPU0中計算損失,並進行反向傳播計算梯度、優化器更新引數。

DDP

  為了對分散式程式設計有基本概念,首先使用pytorch內部的方法實現一個多程序程式,再使用DDP模組實現模型的分散式訓練。

Pytorch分散式基礎

  首先使用pytorch內部的方法編寫一個多程序程式作為編寫分散式訓練的基礎。

import os, torch
import torch.multiprocessing as mp
import torch.distributed as dist

def run(rank, size):
  tensor = torch.tensor([1,2,3,4], device='cuda:'+str(rank)) # ——1—— 
  group = dist.new_group(range(size)) # ——2——
  dist.all_reduce(tensor=tensor, group=group, op=dist.ReduceOp.SUM) # ——3——
  print(str(rank)+ ': ' + str(tensor) + '\n')

def ini_process(rank, size, fn, backend = 'nccl'):  
  os.environ['MASTER_ADDR'] = '127.0.0.1' # ——4——
  os.environ['MASTER_PORT'] = '1234'
  dist.init_process_group(backend, rank=rank, world_size=size) # ——5——
  fn(rank, size) # ——6——

if __name__ == '__main__': # ——7——
  mp.set_start_method('spawn') # ——8—— 
  size = 2 # ——9—— 
  ps = []  
  for rank in range(size):
    p = mp.Process(target=ini_process, args=(rank, size, run)) # ——10—— 
    p.start()  
    ps.append(p)

  for p in ps: # ——11—— 
    p.join()

  以上程式碼主程序建立了兩個子程序,子程序之間使用NCCL後端進行通訊。每個子程序各佔用一個GPU資源,實現了所有GPU張量求和的功能。細節註釋如下:

  1、為每個子程序定義相同名稱的張量,並分別分配至不同的GPU,從而能進行後續的GPU間通訊。

  2、定義一個通訊組,用於後面的all_reduce通訊操作。

  3、all_reduce操作以及其它通訊方式請看下圖:

  4、定義編號(rank)為0的ip和埠地址,讓每個子程序都知道。ip和埠地址可以隨意定義,不衝突即可。如果不設定,子程序在涉及程序通訊時會出錯。

  5、初始化子行程群組,定義程序間的通訊後端(還有GLOO、MPI,只有NCCL支援GPU間通訊)、子程序rank、子程序數量。只有當該函數在size個程序中被呼叫時,各程序才會繼續從這裡執行下去。這個函數統一了各子程序後續程式碼的開始時間。

  6、執行子程序程式碼。

  7、由於建立子程序會執行本程式,因此主程序的執行需要放在__main__裡,防止子程序執行。

  8、開始建立子程序的方式:spawn、fork。windows預設spawn,linux預設fork。具體區別請百度。

  9、由於是以NCCL為通訊後端的分散式訓練,如果不同程序中相同名稱的張量在同一GPU上,當這個張量進行程序間通訊時就會出錯。為了防止出錯,限制每張卡獨佔一個程序,每個程序獨佔一張卡。這裡有兩張卡,所以最多隻能建立兩個程序。

  10、建立子程序,傳入子程序的初始化方法,及子程序呼叫該方法的引數。

  11、等待子程序全部執行完畢後再退出主程序。 

  輸出結果如下:

  正是各程序儲存在不同GPU上的張量的廣播求和(all_reduce)的結果。

  參考: https://pytorch.org/tutorials/intermediate/dist_tuto.html

Pytorch分散式訓練DEMO

  我們實際上可以根據上面的分散式基礎寫一個分散式訓練,但由於不知道pytorch如何實現GPU間模型梯度的求和,即官方教學中所謂的ring_reduce(沒找到相關API),時間原因,就不再去搜尋相關方法了。這裡僅記錄pytorh內部的分散式模型訓練,即利用DDP模組實現。Pytorch版本1.12.1。

import torch,os
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch import nn


def example(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)  # ——1——
    model = nn.Linear(2, 1, False).to(rank) 
    if rank == 0: # ——2——
        model.load_state_dict(torch.load('model_weight')) 
    # model_stat = torch.load('model_weight', {'cuda:0':'cuda:%d'%rank})  #這樣讀取保險一點
    # model.load_state_dict(model_stat) 
    opt = optim.Adam(model.parameters(), lr=0.0001) # ——3——
    opt_stat = torch.load('opt_weight', {'cuda:0':'cuda:%d'%rank}) # ——4——
    opt.load_state_dict(opt_stat) # ——5——
    ddp_model = DDP(model, device_ids=[rank])# ——6
    inp = torch.tensor([[1.,2]]).to(rank) # ——7——
    labels = torch.tensor([[5.]]).to(rank)
    outp = ddp_model(inp)
    loss = torch.mean((outp - labels)**2)
    opt.zero_grad()
    loss.backward() # ——8——

    opt.step() # ——9
    if rank == 0:# ——10——
        torch.save(model.state_dict(), 'model_weight')
        torch.save(opt.state_dict(), 'opt_weight')
    

if __name__=="__main__":
    os.environ["MASTER_ADDR"] = "localhost"# ——11——
    os.environ["MASTER_PORT"] = "29500"
    world_size = 2
    mp.spawn(example, args=(world_size,), nprocs=world_size, join=True) # ——12——

  以上程式碼包含模型在多GPU上讀取權重、進行分散式訓練、儲存權重等過程。細節註釋如下:

  1、初始化行程群組,由於使用GPU通訊,後端應該寫為NCCL。不過經過實驗,即使錯寫為gloo,DDP內部也會自動使用NCCL作為通訊模組。

  2、由於後面使用DDP包裹模型進行訓練,其內部會自動將所有rank的模型權重同步為rank 0的權重,因此我們只需在rank 0上讀取模型權重即可。這是基於Pytorch版本1.12.1,低階版本似乎沒有這個特性,需要在不同rank分別匯入權重,則load需要傳入map_location,如下面註釋的兩行程式碼所示。

  3、這裡建立model的優化器,而不是建立用ddp包裹後的ddp_model的優化器,是為了相容單GPU訓練,讀取優化器權重更方便。

  4、將優化器權重讀取至該程序佔用的GPU。如果沒有map_location引數,load會將權重讀取到原本儲存它時的裝置。

  5、優化器獲取權重。經過實驗,即使權重不在優化器所在的GPU,權重也會遷移過去而不會報錯。當然load直接讀取到相應GPU會減少資料傳輸。

  6、DDP包裹模型,為模型複製一個副本到相應GPU中。所有rank的模型副本會與rank 0保持一致。注意,DDP並不複製模型優化器的副本,因此各程序的優化器需要我們在初始化時保持一致。權重要麼不讀取,要麼都讀取。

  7、這裡開始模型的訓練。資料需轉移到相應的GPU裝置。

  8、在backward中,所有程序的模型計算梯度後,會進行平均(不是相加)。也就是說,DDP在backward函數新增了hook,所有程序的模型梯度的ring_reduce將在這裡執行。這個可以通過給各程序模型分別輸入不同的資料進行驗證,backward後這些模型有相同的梯度,且驗算的確是所有程序梯度的平均。此外,還可以驗證backward函數會阻斷(block)各程序使用梯度,只有當所有程序都完成backward之後,各程序才能讀取和使用梯度。這保證了所有程序在梯度上的一致性。

  9、各程序優化器使用梯度更新其模型副本權重。由於初始化時各程序模型、優化器權重一致,每次反向傳播梯度也保持一致,則所有程序的模型在整個訓練過程中都能保持一致。

  10、由於所有程序權重保持一致,我們只需通過一個程序儲存即可。

  11、定義rank 0的IP和埠,使用mp.spawn,只需在主程序中定義即可,無需分別在子程序中定義。

  12、建立子程序,傳入:子程序呼叫的函數(該函數第一個引數必須是rank)、子程序函數的引數(除了rank引數外)、子程序數、是否等待所有子程序建立完畢再開始執行。

  參考: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html