Python Futures並行程式設計詳解

2020-07-16 10:05:02
無論哪門程式語言,並行程式設計都是一項很常用很重要的技巧。例如,爬蟲就被廣泛應用在工業界的各個領域,我們每天在各個網站、各個 App 上獲取的新聞資訊,很大一部分便是通過並行程式設計版的爬蟲獲得。

正確合理地使用並行程式設計,無疑會給程式帶來極大的效能提升。因此,本節就帶領大家一起學習 Python 中的 Futures 並行程式設計。首先,先帶領大家從程式碼的角度來理解並行程式設計中的 Futures,並進一步來比較其與單執行緒的效能區別。

假設有這樣一個任務,要下載一些網站的內容並列印,如果用單執行緒的方式,它的程式碼實現如下所示(為了突出主題,對程式碼做了簡化,忽略了例外處理):
import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))
   
def download_all(sites):
    for site in sites:
        download_one(site)

def main():
    sites = [
        'http://c.biancheng.net',
        'http://c.biancheng.net/c',
        'http://c.biancheng.net/python'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
   
if __name__ == '__main__':
    main()
輸出結果為:

Read 52053 from http://c.biancheng.net
Read 30718 from http://c.biancheng.net/c
Read 34470 from http://c.biancheng.net/python
Download 3 sites in 0.3537296 seconds

注意,此程式中,requests 模組需單獨安裝,可通過執行 pip install requests 命令進行安裝。

這種方式應該是最直接也最簡單的:
  • 先是遍歷儲存網站的列表;
  • 然後對當前網站執行下載操作;
  • 等到當前操作完成後,再對下一個網站進行同樣的操作,一直到結束。

可以看到,總共耗時約 0.35s。單執行緒的優點是簡單明瞭,但是明顯效率低下,因為上述程式的絕大多數時間都浪費在了 I/O 等待上。程式每次對一個網站執行下載操作,都必須等到前一個網站下載完成後才能開始。如果放在實際生產環境中,我們需要下載的網站數量至少是以萬為單位的,不難想象,這種方案根本行不通。

接著再來看多執行緒版本的程式碼實現:
import concurrent.futures
import requests
import threading
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one, sites)

def main():
    sites = [
        'http://c.biancheng.net',
        'http://c.biancheng.net/c',
        'http://c.biancheng.net/python'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()
執行結果為:

Read 52053 from http://c.biancheng.net
Read 30718 from http://c.biancheng.net/c
Read 34470 from http://c.biancheng.net/python

Download 3 sites in 0.1606366 seconds

可以看到,總耗時是 0.2s 左右,效率一下子提升了很多。

注意,雖然執行緒的數量可以自己定義,但是執行緒數並不是越多越好,因為執行緒的建立、維護和刪除也會有一定的開銷,所以如果設定的很大,反而可能會導致速度變慢。我們往往需要根據實際的需求做一些測試,來尋找最優的執行緒數量。

上面兩段程式碼中,多執行緒版本和單執行緒版的主要區別在於如下程式碼:
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(download_one, sites)
這裡建立了一個執行緒池,總共有 5 個執行緒可以分配使用。executer.map() 與前面所講的 Python 內建的 map() 函數類似,表示對 sites 中的每一個元素並行地呼叫函數 download_one()。

在 download_one() 函數中使用的 requests.get() 方法是執行緒安全的,在多執行緒的環境下也可以安全使用,不會出現條件競爭(多個執行緒同時競爭使用同一資源)的情況。

當然,也可以用並行的方式去提高程式執行效率,只需要在 download_all() 函數中做出下面的變化即可:
with futures.ThreadPoolExecutor(workers) as executor
#=>
with futures.ProcessPoolExecutor() as executor:
這部分程式碼中,函數 ProcessPoolExecutor() 表示建立進程池,使用多個進程並行的執行程式。不過,這裡通常省略引數 workers,因為系統會自動返回 CPU 的數量作為可以呼叫的進程數。

但是,並行的方式一般用在 CPU heavy 的場景中,因為對於 I/O heavy 的操作,多數時間都會用於等待,相比於多執行緒,使用多進程並不會提升效率。反而很多時候,因為 CPU 數量的限制,會導致其執行效率不如多執行緒版本。

什麼是Futures?

Python Futures 模組,位於 concurrent.futures 和 asyncio 中,它們都表示帶有延遲的操作。Futures 會將處於等待狀態的操作包裹起來放到佇列中,這些操作的狀態隨時可以查詢,當然它們的結果(或是異常)也能夠在操作完成後被獲取。

通常來說,使用者不用考慮如何去建立 Futures,這些 Futures 底層都會幫我們處理好,唯一要做的只是去設定這些 Futures 的執行。比如,Futures 中的 Executor 類,當執行 executor.submit(func) 時,它便會安排裡面的 func() 函數執行,並返回建立好的 future 範例,以便之後查詢呼叫。

這裡再介紹一些常用的函數。比如 Futures 中的方法 done(),表示相對應的操作是否完成,返回 True 表示完成;返回 False 表示沒有完成。不過要注意的是,done() 是非阻塞的,會立即返回結果。相對應的 add_done_callback(fn),則表示 Futures 完成後,相對應的引數函數 fn 會被通知並執行呼叫。

Futures 中還有一個重要的函數 result(),它表示當 future 完成後,返回其對應的結果或異常。而 as_completed(fs),則是針對給定的 future 疊代器 fs,在其完成後返回完成後的疊代器。

所以,上述例子也可以寫成下面的形式:
import concurrent.futures
import requests
import time

def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))

def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        to_do = []
        for site in sites:
            future = executor.submit(download_one, site)
            to_do.append(future)
           
        for future in concurrent.futures.as_completed(to_do):
            future.result()
def main():
    sites = [
        'http://c.biancheng.net',
        'http://c.biancheng.net/c',
        'http://c.biancheng.net/python'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))

if __name__ == '__main__':
    main()
執行結果為:

Read 52053 from http://c.biancheng.net
Read 34470 from http://c.biancheng.net/python
Read 30718 from http://c.biancheng.net/c

Download 3 sites in 0.2275894 seconds

此程式中,首先呼叫 executor.submit(),將下載每一個網站的內容都放進 future 佇列 to_do 等待執行。然後是 as_completed() 函數在 future 完成後便輸出結果。

不過,這裡要注意,future 列表中每個 future 完成的順序和它在列表中的順序並不一定完全一致。到底哪個先完成、哪個後完成,取決於系統的排程和每個 future 的執行時間。