Python Joblib庫使用學習總結

2023-06-11 06:00:42

實踐環境

python 3.6.2

Joblib

簡介

Joblib是一組在Python中提供輕量級流水線的工具。特別是:

  1. 函數的透明磁碟快取和延遲重新計算(記憶模式)
  2. 簡單易用的平行計算

Joblib已被優化得很快速,很健壯了,特別是在巨量資料上,並對numpy陣列進行了特定的優化。

主要功能

  1. 輸出值的透明快速磁碟快取(Transparent and fast disk-caching of output value): Python函數的記憶體化或類似make的功能,適用於任意Python物件,包括非常大的numpy陣列。通過將操作寫成一組具有定義良好的輸入和輸出的步驟:Python函數,將永續性和流執行邏輯與域邏輯或演演算法程式碼分離開來。Joblib可以將其計算儲存到磁碟上,並僅在必要時重新執行:

    原文:

    Transparent and fast disk-caching of output value: a memoize or make-like functionality for Python functions that works well for arbitrary Python objects, including very large numpy arrays. Separate persistence and flow-execution logic from domain logic or algorithmic code by writing the operations as a set of steps with well-defined inputs and outputs: Python functions. Joblib can save their computation to disk and rerun it only if necessary:

    >>> from joblib import Memory
    >>> cachedir = 'your_cache_dir_goes_here'
    >>> mem = Memory(cachedir)
    >>> import numpy as np
    >>> a = np.vander(np.arange(3)).astype(float)
    >>> square = mem.cache(np.square)
    >>> b = square(a)                                   
    ______________________________________________________________________...
    [Memory] Calling square...
    square(array([[0., 0., 1.],
           [1., 1., 1.],
           [4., 2., 1.]]))
    _________________________________________________...square - ...s, 0.0min
    
    >>> c = square(a)
    # The above call did not trigger an evaluation
    
  2. 並行助手(parallel helper):輕鬆編寫可讀的並行程式碼並快速偵錯

    >>> from joblib import Parallel, delayed
    >>> from math import sqrt
    >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
    
    >>> res = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
    >>> res
    [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
    
  3. 快速壓縮的持久化(Fast compressed Persistence):代替pickle在包含巨量資料的Python物件上高效工作(joblib.dump&joblib.load)。

parallel for loops

常見用法

Joblib提供了一個簡單的助手類,用於使用多程序為迴圈實現並行。核心思想是將要執行的程式碼編寫為生成器表示式,並將其轉換為平行計算

>>> from math import sqrt
>>> [sqrt(i ** 2) for i in range(10)]
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

使用以下程式碼,可以分佈到2個CPU上:

>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

輸出可以是一個生成器,在可以獲取結果時立即返回結果,即使後續任務尚未完成。輸出的順序始終與輸入的順序相匹配:輸出的順序總是匹配輸入的順序:

>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> parallel = Parallel(n_jobs=2, return_generator=True) # py3.7往後版本才支援return_generator引數
>>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10))
>>> print(type(output_generator))
<class 'generator'>
>>> print(next(output_generator))
0.0
>>> print(next(output_generator))
1.0
>>> print(list(output_generator))
[2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

此生成器允許減少joblib.Parallel的記憶體佔用呼叫

基於執行緒的並行VS基於程序的並行

預設情況下,joblib.Parallel使用'loky'後端模組啟動單獨的Python工作程序,以便在分散的CPU上同時執行任務。對於一般的Python程式來說,這是一個合理的預設值,但由於輸入和輸出資料需要在佇列中序列化以便同工作程序進行通訊,因此可能會導致大量開銷(請參閱序列化和程序)。

當你知道你呼叫的函數是基於一個已編譯的擴充套件,並且該擴充套件在大部分計算過程中釋放了Python全域性直譯器鎖(GIL)時,使用執行緒而不是Python程序作為並行工作者會更有效。例如,在Cython函數的with nogil 塊中編寫CPU密集型程式碼。

如果希望程式碼有效地使用執行緒,只需傳遞preferre='threads'作為joblib.Parallel建構函式的引數即可。在這種情況下,joblib將自動使用"threading"後端,而不是預設的"loky"後端

>>> Parallel(n_jobs=2, prefer=threads')(
...     delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

也可以在上下文管理器的幫助下手動選擇特定的後端實現:

>>> from joblib import parallel_backend
>>> with parallel_backend('threading', n_jobs=2):
...    Parallel()(delayed(sqrt)(i ** 2) for i in range(10))
...
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

後者在呼叫內部使用joblib.Parallel的庫時特別有用,不會將後端部分作為其公共API的一部分公開。

'loky'後端可能並不總是可獲取。

一些罕見的系統不支援多處理(例如Pyodide)。在這種情況下,loky後端不可用,使用執行緒作為預設後端。

除了內建的joblib後端之外,還可以使用幾個特定於叢集的後端:

序列化與程序

要在多個python程序之間共用函數定義,必須依賴序列化協定。python中的標準協定是pickle ,但它在標準庫中的預設實現有幾個限制。例如,它不能序列化互動式定義的函數或在__main__模組中定義的函數。

為了避免這種限制,loky後端現在依賴於cloudpickle以序列化python物件。cloudpicklepickle協定的另一種實現方式,允許序列化更多的物件,特別是互動式定義的函數。因此,對於大多數用途,loky後端應該可以完美的工作。

cloudpickle的主要缺點就是它可能比標準類庫中的pickle慢,特別是,對於大型python字典或列表來說,這一點至關重要,因為它們的序列化時間可能慢100倍。有兩種方法可以更改 joblib的序列化過程以緩和此問題:

  • 如果您在UNIX系統上,則可以切換回舊的multiprocessing後端。有了這個後端,可以使用很快速的pickle在工作程序中共用互動式定義的函數。該解決方案的主要問題是,使用fork啟動程序會破壞標準POSIX,並可能與numpyopenblas等第三方庫進行非正常互動。

  • 如果希望將loky後端與不同的序列化庫一起使用,則可以設定LOKY_PICKLER=mod_pickle環境變數,以使用mod_pickle作為loky的序列化庫。作為引數傳遞的模組mod_pickle應按import mod_picke匯入,並且應包含一個Pickler 物件,該物件將用於序列化為物件。可以設定LOKY_PICKLER=pickle以使用表中類庫中的pickling模組。LOKY_PICKLER=pickle的主要缺點是不能序列化互動式定義的函數。為了解決該問題,可以將此解決方案與joblib.wrap_non_picklable_objects() 一起使用,joblib.wrap_non_picklable_objects()可用作裝飾器以為特定對下本地啟用cloudpickle。通過這種方式,可以為所有python物件使用速度快的picking,並在本地為互動式函數啟用慢速的pickling。查閱loky_wrapper獲取範例。

共用記憶體語意

joblib的預設後端將在獨立的Python程序中執行每個函數呼叫,因此它們不能更改主程式中定義的公共Python物件。

然而,如果並行函數確實需要依賴於執行緒的共用記憶體語意,則應顯示的使用require='sharemem',例如:

>>> shared_set = set()
>>> def collect(x):
...    shared_set.add(x)
...
>>> Parallel(n_jobs=2, require='sharedmem')(
...     delayed(collect)(i) for i in range(5))
[None, None, None, None, None]
>>> sorted(shared_set)
[0, 1, 2, 3, 4]

請記住,從效能的角度來看,依賴共用記憶體語意可能是次優的,因為對共用Python物件的並行存取將受到鎖爭用的影響。

注意,不使用共用記憶體的情況下,任務程序之間的記憶體資源是相互獨立的,舉例說明如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import time
import threading
from joblib import Parallel, delayed, parallel_backend
from collections import deque

GLOBAL_LIST = []

class TestClass():
    def __init__(self):
        self.job_queue = deque()

    def add_jobs(self):
        i = 0
        while i < 3:
            time.sleep(1)
            i += 1
            GLOBAL_LIST.append(i)
            self.job_queue.append(i)
            print('obj_id:', id(self),  'job_queue:', self.job_queue, 'global_list:', GLOBAL_LIST)


def get_job_queue_list(obj):
    i = 0
    while not obj.job_queue and i < 3:
        time.sleep(1)
        i += 1
        print('obj_id:', id(obj), 'job_queue:', obj.job_queue, 'global_list:', GLOBAL_LIST)
    return obj.job_queue


if __name__ == "__main__":
    obj = TestClass()

    def test_fun():
        with parallel_backend("multiprocessing", n_jobs=2):
            Parallel()(delayed(get_job_queue_list)(obj) for i in range(2))

    thread = threading.Thread(target=test_fun, name="parse_log")
    thread.start()

    time.sleep(1)
    obj.add_jobs()
    print('global_list_len:', len(GLOBAL_LIST))

控制檯輸出:

obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1]) global_list: [1]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2]) global_list: [1, 2]
obj_id: 1554577912664 job_queue: deque([]) global_list: []
obj_id: 1930069893920 job_queue: deque([]) global_list: []
obj_id: 2378500766968 job_queue: deque([1, 2, 3]) global_list: [1, 2, 3]
global_list_len: 3

通過輸出可知,通過joblib.Parallel開啟的程序,其佔用記憶體和主執行緒佔用的記憶體資源是相互獨立

複用worer池

一些演演算法需要對並行函數進行多次連續呼叫,同時對中間結果進行處理。在一個迴圈中多次呼叫joblib.Parallel次優的,因為它會多次建立和銷燬一個workde(執行緒或程序)池,這可能會導致大量開銷。

在這種情況下,使用joblib.Parallel類的上下文管理器API更有效,以便對joblib.Parallel物件的多次呼叫可以複用同一worker池。

from joblib import Parallel, delayed
from math import sqrt

with Parallel(n_jobs=2) as parallel:
   accumulator = 0.
   n_iter = 0
   while accumulator < 1000:
       results = parallel(delayed(sqrt)(accumulator + i ** 2) for i in range(5))
       accumulator += sum(results)  # synchronization barrier
       n_iter += 1

print(accumulator, n_iter)  #輸出: 1136.5969161564717 14                          

請注意,現在基於程序的並行預設使用'loky'後端,該後端會自動嘗試自己維護和重用worker池,即使是在沒有上下文管理器的呼叫中也是如此

筆者實踐發現,即便採用這種實現方式,其執行效率也是非常低下的,應該儘量避免這種設計(實踐環境 Python3.6)

...略

Parallel參考檔案

class joblib.Parallel(n_jobs=default(None), backend=None, return_generator=False, verbose=default(0), timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto', temp_folder=default(None), max_nbytes=default('1M'), mmap_mode=default('r'), prefer=default(None), require=default(None))

常用引數說明

  • n_jobs:int, 預設:None

    並行執行作業的最大數量,例如當backend='multiprocessing'時Python工作程序的數量,或者當backend='threading'時執行緒池的大小。如果設定為 -1,則使用所有CPU。如果設定為1,則根本不使用平行計算程式碼,並且行為相當於一個簡單的python for迴圈。此模式與timeout不相容。如果n_jobs小於-1,則使用(n_cpus+1+n_jobs)。因此,如果n_jobs=-2,將使用除一個CPU之外的所有CPU。如果為None,則預設n_jobs=1,除非在parallel_backend()上下文管理器下執行呼叫,此時會為n_jobs設定另一個值。

  • backend: str, ParallelBackendBase範例或者None, 預設: 'loky'

    指定並行化後端實現。支援的後端有:

    • loky 在與工作Python程序交換輸入和輸出資料時,預設使用的loky可能會導致一些通訊和記憶體開銷。在一些罕見的系統(如Pyiode)上,loky後端可能不可用。

    • multiprocessing 以前基於程序的後端,基於multiprocessing.Pool。不如loky健壯。

    • threading 是一個開銷很低的後端,但如果被呼叫的函數大量依賴於Python物件,它會受到Python GIL的影響。當執行瓶頸是顯式釋放GIL的已編譯擴充套件時,threading最有用(例如,with-nogil塊中封裝的Cython迴圈或對NumPy等庫的昂貴呼叫)。

    • 最後,可以通過呼叫register_pallel_backend()來註冊後端。

    不建議在類庫中呼叫Parallel時對backend名稱進行寫死,取而代之,建議設定軟提示(prefer)或硬約束(require),以便庫使用者可以使用parallel_backend()上下文管理器從外部更改backend

  • return_generator: bool

    如果為True,則對此範例的呼叫將返回一個生成器,並在結果可獲取時立即按原始順序返回結果。請注意,預期用途是一次執行一個呼叫。對同一個Parallel物件的多次呼叫將導致RuntimeError

  • prefer: str 可選值 ‘processes’, ‘threads’ ,None, 預設: None

    如果使用parallel_backen()上下文管理器時沒有指定特定後端,則選擇預設prefer給定值。預設的基於程序的後端是loky,而預設的基於執行緒的後端則是threading。如果指定了backend引數,則忽略該引數。

  • require: ‘sharedmem’ 或者None, 預設None

    用於選擇後端的硬約束。如果設定為'sharedmem',則所選後端將是單主機和基於執行緒的,即使使用者要求使用具有parallel_backend的非基於執行緒的後端。

參考檔案

https://joblib.readthedocs.io/en/latest/

https://joblib.readthedocs.io/

https://joblib.readthedocs.io/en/latest/parallel.html#common-usage