Generator(生成器),入門初基,Coroutine(原生協程),登峰造極,Python3.10並行非同步程式設計async底層實現

2022-12-28 09:00:42

普遍意義上講,生成器是一種特殊的迭代器,它可以在執行過程中暫停並在恢復執行時保留它的狀態。而協程,則可以讓一個函數在執行過程中暫停並在恢復執行時保留它的狀態,在Python3.10中,原生協程的實現手段,就是生成器,或者說的更具體一些:協程就是一種特殊的生成器,而生成器,就是協程的入門心法。

協程底層實現

我們知道,Python3.10中可以使用async和await關鍵字來實現原生協程函數的定義和排程,但其實,我們也可以利用生成器達到協程的效果,生成器函數和普通函數的區別在於,生成器函數使用 yield 語句來暫停執行並返回結果。例如,下面是一個使用生成器函數實現的簡單協程:

def my_coroutine():  
    while True:  
        x = yield  
        print(x)  
  
# 使用生成器函數建立協程  
coroutine = my_coroutine()  
  
# 啟動協程  
next(coroutine)  
  
# 在協程中傳入資料  
coroutine.send(1)  
coroutine.send(2)  
coroutine.send(3)

程式返回:

➜  mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"  
1  
2  
3

在上面的程式碼中,生成器函數 my_coroutine 使用了一個無限迴圈來實現協程的邏輯。每當呼叫 send 方法時,協程就會從 yield 語句處恢復執行,並將傳入的引數賦值給變數 x。

如此,就完成了協程執行-》阻塞-》切換-》回撥的工作流模式。

當然,作為事件迴圈機制,協程服務啟動可能無限期地執行,要關閉協程服務,可以使用生成器的close()方法。當一個協程被關閉時,它會生成GeneratorExit異常,該異常可以用生成器的方式進行捕獲:

def my_coroutine():  
    try :  
        while True:  
            x = yield  
            print(x)  
    except GeneratorExit:  
            print("協程關閉")  
  
# 使用生成器函數建立協程  
coroutine = my_coroutine()  
  
# 啟動協程  
next(coroutine)  
  
# 在協程中傳入資料  
coroutine.send(1)  
coroutine.send(2)  
coroutine.send(3)  
  
coroutine.close()

程式返回:

➜  mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"  
1  
2  
3  
協程關閉

業務場景

在實際業務場景中,我們也可以使用生成器來模擬協程流程,主要體現在資料的IO流操作中,假設我們需要從本地往伺服器傳輸資料,首先建立連結物件:

class Connection:  
  
  
    def __init__(self, addr):  
        self.addr = addr  
  
    def transmit(self, data):  
        print(f"X: {data[0]}, Y: {data[1]} sent to {self.addr}")

隨後建立生成器函數:

def send_to_server(conn):  
    while True:  
        try:  
            raw_data = yield  
            raw_data = raw_data.split(' ')  
            coords = (float(raw_data[0]), float(raw_data[1]))  
            conn.transmit(coords)  
        except ConnectionError:  
            print("連結丟失,進行回撥")  
            conn = Connection("重新連線v3u.cn")

利用生成器呼叫連結類的transmit方法進行資料的模擬傳輸,如果連結斷開,則會觸發回撥重新連線,執行邏輯:

if __name__ == '__main__':  
  
  
    conn = Connection("v3u.cn")  
  
    sender = send_to_server(conn)  
    sender.send(None)  
  
    for i in range(1, 6):  
        sender.send(f"{100/i} {200/i}")  
  
    # 模擬連結斷開  
    conn.addr = None  
  
  
    sender.throw(ConnectionError)   
  
    for i in range(1, 6):  
        sender.send(f"{100/i} {200/i}")

程式返回:

X: 100.0, Y: 200.0 sent to v3u.cn  
X: 50.0, Y: 100.0 sent to v3u.cn  
X: 33.333333333333336, Y: 66.66666666666667 sent to v3u.cn  
X: 25.0, Y: 50.0 sent to v3u.cn  
X: 20.0, Y: 40.0 sent to v3u.cn  
連結丟失,進行回撥  
X: 100.0, Y: 200.0 sent to 重新連線v3u.cn  
X: 50.0, Y: 100.0 sent to 重新連線v3u.cn  
X: 33.333333333333336, Y: 66.66666666666667 sent to 重新連線v3u.cn  
X: 25.0, Y: 50.0 sent to 重新連線v3u.cn  
X: 20.0, Y: 40.0 sent to 重新連線v3u.cn

如此,我們就可以利用生成器的「狀態保留」機制來控制網路連結突然斷開的回撥補救措施了。

所以說,協程就是一種特殊的生成器:

async def test():  
    pass  
  
print(type(test())) 

您猜怎麼著?

<class 'coroutine'>

結語

誠然,生成器和協程也並非完全是一個概念,與生成器不同的是,協程可以被另一個函數(稱為呼叫方)恢復執行,而不是隻能由生成器本身恢復執行。這使得協程可以用來實現更復雜的控制流,因為它們可以在執行時暫停並在任意時刻恢復執行。