進軍Python全棧開發--12.進程和執行緒

2020-08-13 21:06:30

進程和執行緒

今天我們使用的計算機早已進入多CPU或多核時代,而我們使用的操作系統都是支援「多工」的操作系統,這使得我們可以同時執行多個程式,也可以將一個程式分解爲若幹個相對獨立的子任務,讓多個子任務併發的執行,從而縮短程式的執行時間,同時也讓使用者獲得更好的體驗。因此在當下不管是用什麼程式語言進行開發,實現讓程式同時執行多個任務也就是常說的「併發程式設計」,應該是程式設計師必備技能之一。爲此,我們需要先討論兩個概念,一個叫進程,一個叫執行緒。

概念

進程就是操作系統中執行的一個程式,操作系統以進程爲單位分配儲存空間,每個進程都有自己的地址空間、數據棧以及其他用於跟蹤進程執行的輔助數據,操作系統管理所有進程的執行,爲它們合理的分配資源。進程可以通過fork或spawn的方式來建立新的進程來執行其他的任務,不過新的進程也有自己獨立的記憶體空間,因此必須通過進程間通訊機制 機製(IPC,Inter-Process Communication)來實現數據共用,具體的方式包括管道、信號、通訊端、共用記憶體區等。

一個進程還可以擁有多個併發的執行線索,簡單的說就是擁有多個可以獲得CPU排程的執行單元,這就是所謂的執行緒。由於執行緒在同一個進程下,它們可以共用相同的上下文,因此相對於進程而言,執行緒間的資訊共用和通訊更加容易。當然在單核CPU系統中,真正的併發是不可能的,因爲在某個時刻能夠獲得CPU的只有唯一的一個執行緒,多個執行緒共用了CPU的執行時間。使用多執行緒實現併發程式設計爲程式帶來的好處是不言而喻的,最主要的體現在提升程式的效能和改善使用者體驗,今天我們使用的軟體幾乎都用到了多執行緒技術,這一點可以利用系統自帶的進程監控工具(如macOS中的「活動監視器」、Windows中的「工作管理員」)來證實,如下圖所示。

[外連圖片轉存失敗,源站可能有防盜鏈機制 機製,建議將圖片儲存下來直接上傳(img-nS8BbMyg-1597321178272)(./res/macos-monitor.png)]

當然多執行緒也並不是沒有壞處,站在其他進程的角度,多執行緒的程式對其他程式並不友好,因爲它佔用了更多的CPU執行時間,導致其他程式無法獲得足夠的CPU執行時間;另一方面,站在開發者的角度,編寫和偵錯多執行緒的程式都對開發者有較高的要求,對於初學者來說更加困難。

Python既支援多進程又支援多執行緒,因此使用Python實現併發程式設計主要有3種方式:多進程、多執行緒、多進程+多執行緒。

Python中的多進程

Unix和Linux操作系統上提供了fork()系統呼叫來建立進程,呼叫fork()函數的是父進程,建立出的是子進程,子進程是父進程的一個拷貝,但是子進程擁有自己的PID。fork()函數非常特殊它會返回兩次,父進程中可以通過fork()函數的返回值得到子進程的PID,而子進程中的返回值永遠都是0。Python的os模組提供了fork()函數。由於Windows系統沒有fork()呼叫,因此要實現跨平臺的多進程程式設計,可以使用multiprocessing模組的Process類來建立子進程,而且該模組還提供了更高階的封裝,例如批次啓動進程的進程池(Pool)、用於進程間通訊的佇列(Queue)和管道(Pipe)等。

下面 下麪用一個下載檔案的例子來說明使用多進程和不使用多進程到底有什麼差別,先看看下面 下麪的程式碼。

from random import randint
from time import time, sleep


def download_task(filename):
    print('開始下載%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下載完成! 耗費了%d秒' % (filename, time_to_download))


def main():
    start = time()
    download_task('Python從入門到住院.pdf')
    download_task('Peking Hot.avi')
    end = time()
    print('總共耗費了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

下面 下麪是執行程式得到的一次執行結果。

開始下載Python從入門到住院.pdf...
Python從入門到住院.pdf下載完成! 耗費了6秒
開始下載Peking Hot.avi...
Peking Hot.avi下載完成! 耗費了7秒
總共耗費了13.01秒.

從上面的例子可以看出,如果程式中的程式碼只能按順序一點點的往下執行,那麼即使執行兩個毫不相關的下載任務,也需要先等待一個檔案下載完成後才能 纔能開始下一個下載任務,很顯然這並不合理也沒有效率。接下來我們使用多進程的方式將兩個下載任務放到不同的進程中,程式碼如下所示。

from multiprocessing import Process
from os import getpid
from random import randint
from time import time, sleep


def download_task(filename):
    print('啓動下載進程,進程號[%d].' % getpid())
    print('開始下載%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下載完成! 耗費了%d秒' % (filename, time_to_download))


def main():
    start = time()
    p1 = Process(target=download_task, args=('Python從入門到住院.pdf', ))
    p1.start()
    p2 = Process(target=download_task, args=('Peking Hot.avi', ))
    p2.start()
    p1.join()
    p2.join()
    end = time()
    print('總共耗費了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

在上面的程式碼中,我們通過Process類建立了進程物件,通過target參數我們傳入一個函數來表示進程啓動後要執行的程式碼,後面的args是一個元組,它代表了傳遞給函數的參數。Process物件的start方法用來啓動進程,而join方法表示等待進程執行結束。執行上面的程式碼可以明顯發現兩個下載任務「同時」啓動了,而且程式的執行時間將大大縮短,不再是兩個任務的時間總和。下面 下麪是程式的一次執行結果。

啓動下載進程,進程號[1530].
開始下載Python從入門到住院.pdf...
啓動下載進程,進程號[1531].
開始下載Peking Hot.avi...
Peking Hot.avi下載完成! 耗費了7秒
Python從入門到住院.pdf下載完成! 耗費了10秒
總共耗費了10.01秒.

我們也可以使用subprocess模組中的類和函數來建立和啓動子進程,然後通過管道來和子進程通訊,這些內容我們不在此進行講解,有興趣的讀者可以自己瞭解這些知識。接下來我們將重點放在如何實現兩個進程間的通訊。我們啓動兩個進程,一個輸出Ping,一個輸出Pong,兩個進程輸出的Ping和Pong加起來一共10個。聽起來很簡單吧,但是如果這樣寫可是錯的哦。

from multiprocessing import Process
from time import sleep

counter = 0


def sub_task(string):
    global counter
    while counter < 10:
        print(string, end='', flush=True)
        counter += 1
        sleep(0.01)

        
def main():
    Process(target=sub_task, args=('Ping', )).start()
    Process(target=sub_task, args=('Pong', )).start()


if __name__ == '__main__':
    main()

看起來沒毛病,但是最後的結果是Ping和Pong各輸出了10個,Why?當我們在程式中建立進程的時候,子進程複製了父進程及其所有的數據結構,每個子進程有自己獨立的記憶體空間,這也就意味着兩個子進程中各有一個counter變數,所以結果也就可想而知了。要解決這個問題比較簡單的辦法是使用multiprocessing模組中的Queue類,它是可以被多個進程共用的佇列,底層是通過管道和號志(semaphore)機制 機製來實現的,有興趣的讀者可以自己嘗試一下。

Python中的多執行緒

在Python早期的版本中就引入了thread模組(現在名爲_thread)來實現多執行緒程式設計,然而該模組過於底層,而且很多功能都沒有提供,因此目前的多執行緒開發我們推薦使用threading模組,該模組對多執行緒程式設計提供了更好的物件導向的封裝。我們把剛纔下載檔案的例子用多執行緒的方式來實現一遍。

from random import randint
from threading import Thread
from time import time, sleep


def download(filename):
    print('開始下載%s...' % filename)
    time_to_download = randint(5, 10)
    sleep(time_to_download)
    print('%s下載完成! 耗費了%d秒' % (filename, time_to_download))


def main():
    start = time()
    t1 = Thread(target=download, args=('Python從入門到住院.pdf',))
    t1.start()
    t2 = Thread(target=download, args=('Peking Hot.avi',))
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print('總共耗費了%.3f秒' % (end - start))


if __name__ == '__main__':
    main()

我們可以直接使用threading模組的Thread類來建立執行緒,但是我們之前講過一個非常重要的概念叫「繼承」,我們可以從已有的類建立新類,因此也可以通過繼承Thread類的方式來建立自定義的執行緒類,然後再建立執行緒物件並啓動執行緒。程式碼如下所示。

from random import randint
from threading import Thread
from time import time, sleep


class DownloadTask(Thread):

    def __init__(self, filename):
        super().__init__()
        self._filename = filename

    def run(self):
        print('開始下載%s...' % self._filename)
        time_to_download = randint(5, 10)
        sleep(time_to_download)
        print('%s下載完成! 耗費了%d秒' % (self._filename, time_to_download))


def main():
    start = time()
    t1 = DownloadTask('Python從入門到住院.pdf')
    t1.start()
    t2 = DownloadTask('Peking Hot.avi')
    t2.start()
    t1.join()
    t2.join()
    end = time()
    print('總共耗費了%.2f秒.' % (end - start))


if __name__ == '__main__':
    main()

因爲多個執行緒可以共用進程的記憶體空間,因此要實現多個執行緒間的通訊相對簡單,大家能想到的最直接的辦法就是設定一個全域性變數,多個執行緒共用這個全域性變數即可。但是當多個執行緒共用同一個變數(我們通常稱之爲「資源」)的時候,很有可能產生不可控的結果從而導致程式失效甚至崩潰。如果一個資源被多個執行緒競爭使用,那麼我們通常稱之爲「臨界資源」,對「臨界資源」的存取需要加上保護,否則資源會處於「混亂」的狀態。下面 下麪的例子演示了100個執行緒向同一個銀行賬戶轉賬(轉入1元錢)的場景,在這個例子中,銀行賬戶就是一個臨界資源,在沒有保護的情況下我們很有可能會得到錯誤的結果。

from time import sleep
from threading import Thread


class Account(object):

    def __init__(self):
        self._balance = 0

    def deposit(self, money):
        # 計算存款後的餘額
        new_balance = self._balance + money
        # 模擬受理存款業務需要0.01秒的時間
        sleep(0.01)
        # 修改賬戶餘額
        self._balance = new_balance

    @property
    def balance(self):
        return self._balance


class AddMoneyThread(Thread):

    def __init__(self, account, money):
        super().__init__()
        self._account = account
        self._money = money

    def run(self):
        self._account.deposit(self._money)


def main():
    account = Account()
    threads = []
    # 建立100個存款的執行緒向同一個賬戶中存錢
    for _ in range(100):
        t = AddMoneyThread(account, 1)
        threads.append(t)
        t.start()
    # 等所有存款的執行緒都執行完畢
    for t in threads:
        t.join()
    print('賬戶餘額爲: ¥%d元' % account.balance)


if __name__ == '__main__':
    main()

執行上面的程式,結果讓人大跌眼鏡,100個執行緒分別向賬戶中轉入1元錢,結果居然遠遠小於100元。之所以出現這種情況是因爲我們沒有對銀行賬戶這個「臨界資源」加以保護,多個執行緒同時向賬戶中存錢時,會一起執行到new_balance = self._balance + money這行程式碼,多個執行緒得到的賬戶餘額都是初始狀態下的0,所以都是0上面做了+1的操作,因此得到了錯誤的結果。在這種情況下,「鎖」就可以派上用場了。我們可以通過「鎖」來保護「臨界資源」,只有獲得「鎖」的執行緒才能 纔能存取「臨界資源」,而其他沒有得到「鎖」的執行緒只能被阻塞起來,直到獲得「鎖」的執行緒釋放了「鎖」,其他執行緒纔有機會獲得「鎖」,進而存取被保護的「臨界資源」。下面 下麪的程式碼演示瞭如何使用「鎖」來保護對銀行賬戶的操作,從而獲得正確的結果。

from time import sleep
from threading import Thread, Lock


class Account(object):

    def __init__(self):
        self._balance = 0
        self._lock = Lock()

    def deposit(self, money):
        # 先獲取鎖才能 纔能執行後續的程式碼
        self._lock.acquire()
        try:
            new_balance = self._balance + money
            sleep(0.01)
            self._balance = new_balance
        finally:
            # 在finally中執行釋放鎖的操作保證正常異常鎖都能釋放
            self._lock.release()

    @property
    def balance(self):
        return self._balance


class AddMoneyThread(Thread):

    def __init__(self, account, money):
        super().__init__()
        self._account = account
        self._money = money

    def run(self):
        self._account.deposit(self._money)


def main():
    account = Account()
    threads = []
    for _ in range(100):
        t = AddMoneyThread(account, 1)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print('賬戶餘額爲: ¥%d元' % account.balance)


if __name__ == '__main__':
    main()

比較遺憾的一件事情是Python的多執行緒並不能發揮CPU的多核特性,這一點只要啓動幾個執行死回圈的執行緒就可以得到證實了。之所以如此,是因爲Python的直譯器有一個「全域性直譯器鎖」(GIL)的東西,任何執行緒執行前必須先獲得GIL鎖,然後每執行100條位元組碼,直譯器就自動釋放GIL鎖,讓別的執行緒有機會執行,這是一個歷史遺留問題,但是即便如此,就如我們之前舉的例子,使用多執行緒在提升執行效率和改善使用者體驗方面仍然是有積極意義的。

多進程還是多執行緒

無論是多進程還是多執行緒,只要數量一多,效率肯定上不去,爲什麼呢?我們打個比方,假設你不幸正在準備中考,每天晚上需要做語文、數學、英語、物理、化學這5科的作業,每項作業耗時1小時。如果你先花1小時做語文作業,做完了,再花1小時做數學作業,這樣,依次全部做完,一共花5小時,這種方式稱爲單任務模型。如果你打算切換到多工模型,可以先做1分鐘語文,再切換到數學作業,做1分鐘,再切換到英語,以此類推,只要切換速度足夠快,這種方式就和單核CPU執行多工是一樣的了,以旁觀者的角度來看,你就正在同時寫5科作業。

但是,切換作業是有代價的,比如從語文切到數學,要先收拾桌子上的語文書本、鋼筆(這叫儲存現場),然後,開啓數學課本、找出圓規直尺(這叫準備新環境),才能 纔能開始做數學作業。操作系統在切換進程或者執行緒時也是一樣的,它需要先儲存當前執行的現場環境(CPU暫存器狀態、記憶體頁等),然後,把新任務的執行環境準備好(恢復上次的暫存器狀態,切換記憶體頁等),才能 纔能開始執行。這個切換過程雖然很快,但是也需要耗費時間。如果有幾千個任務同時進行,操作系統可能就主要忙着切換任務,根本沒有多少時間去執行任務了,這種情況最常見的就是硬碟狂響,點視窗無反應,系統處於假死狀態。所以,多工一旦多到一個限度,反而會使得系統效能急劇下降,最終導致所有任務都做不好。

是否採用多工的第二個考慮是任務的型別,可以把任務分爲計算密集型和I/O密集型。計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如對視訊進行編碼解碼或者格式轉換等等,這種任務全靠CPU的運算能力,雖然也可以用多工完成,但是任務越多,花在工作切換的時間就越多,CPU執行任務的效率就越低。計算密集型任務由於主要消耗CPU資源,這類任務用Python這樣的指令碼語言去執行效率通常很低,最能勝任這類任務的是C語言,我們之前提到了Python中有嵌入C/C++程式碼的機制 機製。

除了計算密集型任務,其他的涉及到網路、儲存媒介I/O的任務都可以視爲I/O密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待I/O操作完成(因爲I/O的速度遠遠低於CPU和記憶體的速度)。對於I/O密集型任務,如果啓動多工,就可以減少I/O等待時間從而讓CPU高效率的運轉。有一大類的任務都屬於I/O密集型任務,這其中包括了我們很快會涉及到的網路應用和Web應用。

說明: 上面的內容和例子來自於廖雪峯官方網站的《Python教學》,因爲對作者文中的某些觀點持有不同的看法,對原文的文字描述做了適當的調整。

單執行緒+非同步I/O

現代操作系統對I/O操作的改進中最爲重要的就是支援非同步I/O。如果充分利用操作系統提供的非同步I/O支援,就可以用單進程單執行緒模型來執行多工,這種全新的模型稱爲事件驅動模型。Nginx就是支援非同步I/O的Web伺服器,它在單核CPU上採用單進程模型就可以高效地支援多工。在多核CPU上,可以執行多個進程(數量與CPU核心數相同),充分利用多核CPU。用Node.js開發的伺服器端程式也使用了這種工作模式,這也是當下實現多工程式設計的一種趨勢。

在Python語言中,單執行緒+非同步I/O的程式設計模型稱爲協程,有了協程的支援,就可以基於事件驅動編寫高效的多工程式。協程最大的優勢就是極高的執行效率,因爲子程式切換不是執行緒切換,而是由程式自身控制,因此,沒有執行緒切換的開銷。協程的第二個優勢就是不需要多執行緒的鎖機制 機製,因爲只有一個執行緒,也不存在同時寫變數衝突,在協程中控制共用資源不用加鎖,只需要判斷狀態就好了,所以執行效率比多執行緒高很多。如果想要充分利用CPU的多核特性,最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的效能。關於這方面的內容,我稍後會做一個專題來進行講解。

應用案例

例子1:將耗時間的任務放到執行緒中以獲得更好的使用者體驗。

如下所示的介面中,有「下載」和「關於」兩個按鈕,用休眠的方式模擬點選「下載」按鈕會聯網下載檔案需要耗費10秒的時間,如果不使用「多執行緒」,我們會發現,當點選「下載」按鈕後整個程式的其他部分都被這個耗時間的任務阻塞而無法執行了,這顯然是非常糟糕的使用者體驗,程式碼如下所示。

import time
import tkinter
import tkinter.messagebox


def download():
    # 模擬下載任務需要花費10秒鐘時間
    time.sleep(10)
    tkinter.messagebox.showinfo('提示', '下載完成!')


def show_about():
    tkinter.messagebox.showinfo('關於', '作者: 駱昊(v1.0)')


def main():
    top = tkinter.Tk()
    top.title('單執行緒')
    top.geometry('200x150')
    top.wm_attributes('-topmost', True)

    panel = tkinter.Frame(top)
    button1 = tkinter.Button(panel, text='下載', command=download)
    button1.pack(side='left')
    button2 = tkinter.Button(panel, text='關於', command=show_about)
    button2.pack(side='right')
    panel.pack(side='bottom')

    tkinter.mainloop()


if __name__ == '__main__':
    main()

如果使用多執行緒將耗時間的任務放到一個獨立的執行緒中執行,這樣就不會因爲執行耗時間的任務而阻塞了主執行緒,修改後的程式碼如下所示。

import time
import tkinter
import tkinter.messagebox
from threading import Thread


def main():

    class DownloadTaskHandler(Thread):

        def run(self):
            time.sleep(10)
            tkinter.messagebox.showinfo('提示', '下載完成!')
            # 啓用下載按鈕
            button1.config(state=tkinter.NORMAL)

    def download():
        # 禁用下載按鈕
        button1.config(state=tkinter.DISABLED)
        # 通過daemon參數將執行緒設定爲守護執行緒(主程式退出就不再保留執行)
        # 線上程中處理耗時間的下載任務
        DownloadTaskHandler(daemon=True).start()

    def show_about():
        tkinter.messagebox.showinfo('關於', '作者: 駱昊(v1.0)')

    top = tkinter.Tk()
    top.title('單執行緒')
    top.geometry('200x150')
    top.wm_attributes('-topmost', 1)

    panel = tkinter.Frame(top)
    button1 = tkinter.Button(panel, text='下載', command=download)
    button1.pack(side='left')
    button2 = tkinter.Button(panel, text='關於', command=show_about)
    button2.pack(side='right')
    panel.pack(side='bottom')

    tkinter.mainloop()


if __name__ == '__main__':
    main()

例子2:使用多進程對複雜任務進行「分而治之」。

我們來完成1~100000000求和的計算密集型任務,這個問題本身非常簡單,有點回圈的知識就能解決,程式碼如下所示。

from time import time


def main():
    total = 0
    number_list = [x for x in range(1, 100000001)]
    start = time()
    for number in number_list:
        total += number
    print(total)
    end = time()
    print('Execution time: %.3fs' % (end - start))


if __name__ == '__main__':
    main()

在上面的程式碼中,我故意先去建立了一個列表容器然後填入了100000000個數,這一步其實是比較耗時間的,所以爲了公平起見,當我們將這個任務分解到8個進程中去執行的時候,我們暫時也不考慮列表切片操作花費的時間,只是把做運算和合併運算結果的時間統計出來,程式碼如下所示。

from multiprocessing import Process, Queue
from random import randint
from time import time


def task_handler(curr_list, result_queue):
    total = 0
    for number in curr_list:
        total += number
    result_queue.put(total)


def main():
    processes = []
    number_list = [x for x in range(1, 100000001)]
    result_queue = Queue()
    index = 0
    # 啓動8個進程將數據切片後進行運算
    for _ in range(8):
        p = Process(target=task_handler,
                    args=(number_list[index:index + 12500000], result_queue))
        index += 12500000
        processes.append(p)
        p.start()
    # 開始記錄所有進程執行完成花費的時間
    start = time()
    for p in processes:
        p.join()
    # 合併執行結果
    total = 0
    while not result_queue.empty():
        total += result_queue.get()
    print(total)
    end = time()
    print('Execution time: ', (end - start), 's', sep='')


if __name__ == '__main__':
    main()

比較兩段程式碼的執行結果(在我目前使用的MacBook上,上面的程式碼需要大概6秒左右的時間,而下面 下麪的程式碼只需要不到1秒的時間,再強調一次我們只是比較了運算的時間,不考慮列表建立及切片操作花費的時間),使用多進程後由於獲得了更多的CPU執行時間以及更好的利用了CPU的多核特性,明顯的減少了程式的執行時間,而且計算量越大效果越明顯。當然,如果願意還可以將多個進程部署在不同的計算機上,做成分佈式進程,具體的做法就是通過multiprocessing.managers模組中提供的管理器將Queue物件通過網路共用出來(註冊到網路上讓其他計算機可以存取),這部分內容也留到爬蟲的專題再進行講解。