Python非同步協程(asyncio詳解)

2022-11-29 12:00:08

續上篇講解yield from部落格,上篇連結:https://www.cnblogs.com/Red-Sun/p/16889182.html
PS:本部落格是個人筆記分享,不需要掃碼加群或必須關注什麼的(如果外站需要加群或關注的可以直接去我主頁檢視)
歡迎大家光臨ヾ(≧▽≦*)o我的部落格首頁https://www.cnblogs.com/Red-Sun/
首先要了解什麼是協程,其次知道非同步跟同步的區別。(PS:個人喜歡多做比喻,不恰當地方望指正)
本文僅僅是個人學習筆記,有錯的地方望各位指點。
如果把程序比作從A處到B處去這件事,那麼執行緒就是可供選擇的多條道路,協程就是道路上特殊路段(類似限速,一整條道路都是特殊路段的話,就是全部由協程實現)
例圖如下:

1. 什麼是協程(Coroutines)

在瞭解非同步之前,先大致瞭解一下什麼是協程。
網上的講法有各種:

  • 協程是一種比執行緒更加輕量級的存在
  • 協程是一種使用者級的輕量級執行緒
  • 協程,又稱微執行緒

大體看過之後就感覺,我好像懂了,有好像沒懂,個人感覺有點暈乎乎的,沒太明白。(PS:可能是我個人智商沒夠不能快速領悟的原因)
個人理解(PS:不涉及其本質來源、底層實現、僅僅就著這個非同步爬蟲來說):協程就像一條帶應急車道的高速公路(具體作用就是讓任務有了暫停切換功能)
執行緒:把需要執行的任務比作汽車,執行緒就像一條單行且只有一條道的高速公路,只有等前一輛車到達終點後面的車才能出發,如果其中一輛出了事情停在了路上,那麼這倆車後面的車就只能原地等待直到它恢復併到達終點才能繼續上路。
協程:把需要執行的任務比作汽車,協程就像一條帶應急車道的高速公路,如果汽車在中途出了問題就可以直接到一邊的應急車道停下處理問題,下一輛車可以直接上路,簡單來說就是可以通過程式控制哪輛車行駛,哪輛車在應急車道休息。

2.同步跟非同步

同步跟非同步是兩個相對的概念:
同步:意味著有序
非同步:意味著無序
小故事模擬事件:
小明在家需要完成如下事情:

  1. 電飯鍋煮飯大約30分鐘
  2. 洗衣機洗衣服大約40分鐘
  3. 寫作業大約50分鐘

在同步情況下:小明需要電飯鍋處等待30分鐘、洗衣機處等待40分鐘、寫作業50分鐘,總計花費時間120分鐘。
在非同步情況下:小明需要電飯鍋處理並啟動花費10分鐘、洗衣機處理並啟動花費10分鐘,寫作業花費50分鐘,總計花費時間70分鐘。
即同步必須一件事情結束之後再進行下一件事,非同步是可以在一件事情沒結束就去處理另外一件事情了。
注意:此處非同步比同步耗時更短是有前提條件的!要是I/O阻塞才可以(說人話:類似電飯鍋煮飯,電飯鍋可以自行完成這種的)
如果把條件中的電飯鍋換成柴火,洗衣機換成搓衣板,那麼事情就只能一件一件完成了,兩者耗時相近。

3.asyncio非同步協程

asyncio即Asynchronous I/O是python一個用來處理並行(concurrent)事件的包,是很多python非同步架構的基礎,多用於處理高並行網路請求方面的問題。
此處使用的是Python 3.5之後出現的async/await來實現協程,需要yield實現協程的可以去我上篇部落格瞅瞅:點選此處快速跳轉

基礎補充(比較基礎的內容懂的可以直接跳)

  1. 普通函數
def function():
    return 1

2.由async做字首的普通函數變成了非同步函數

async def asynchronous():
    return 1

而非同步函數不同於普通函數不可能被直接呼叫

async def asynchronous():
    return 1

print(asynchronous())


嘗試用send驅動這個協程

async def asynchronous():
    return 1

asynchronous().send(None)


值有了不過儲存在了這個StopIteration報錯中,於是有了下方的執行器

# -*- coding: utf-8 -*-
# @Time    : 2022/11/22 16:03
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : async_function.py
# @Software: PyCharm

async def asynchronous():
    return 1


def run(async_function):  # 用try解決報錯問題,執行協程函數
    try:
        async_function().send(None)
    except StopIteration as r:
        return r.value


print(run(asynchronous))


成功執行(`・ω・´)ゞ(`・ω・´)ゞ


在協程函數中await的使用(PS:await只能使用在有async修飾的函數中不然會報錯)
await的作用是掛起自身的協程,直到await修飾的協程完成並返回結果(可參照第一點什麼是協程中的描述)

# -*- coding: utf-8 -*-
# @Time    : 2022/11/22 16:03
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : await_function.py
# @Software: PyCharm

async def asynchronous():
    return 1

async def await_function():  # await掛起自身函數,等待另外協程函數執行完畢
    result = await asynchronous()
    return result

def run(async_function):  # 用try解決報錯問題,執行協程函數
    try:
        async_function().send(None)
    except StopIteration as r:
        return r.value


print(run(await_function))


執行流程 run函數->await_function函數->執行到await時->await_function掛起(暫停等待)->asynchronous函數執行並返回1 ->await_function繼續執行返回result ->print列印result值

使用進階

對asyncio的使用首先要了解:

  1. 事件迴圈

建立一個迴圈類似不停執行的洗衣機,把事件(類似衣服)放到迴圈中,個人描述就像是將需要清洗的衣服都放到洗衣機中一共處理。

  1. Future

Future物件表示未完成的計算,還未完成的結果(PS:等待要洗的衣服們(假想成髒衣服堆))

  1. Task

是Future的子類,作用是在執行某個任務的同時可以並行的執行多個任務。(PS:那個髒衣服堆中的單獨一件,可以被扔到洗衣機洗的髒衣服)

3.8版本之前的程式碼

先講需要自己建立loop的後面再講3.8更新後的更容易記憶一點(PS:3.8的更為簡約想直接看3.8版的也可)

1.下面是一個基礎的執行範例
# -*- coding: utf-8 -*-
# @Time    : 2022/11/24 17:32
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例1.py
# @Software: PyCharm

import asyncio
import time


async def async_function():  # async修飾的非同步函數,在該函數中可以新增await進行暫停並切換到其他非同步函數中
    now_time = time.time()
    await asyncio.sleep(1)  # 當執行await future這行程式碼時(future物件就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    print('花費時間:{}秒'.format(time.time()-now_time))

event = async_function()  # 建立協程事件物件

loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件迴圈物件
loop.run_until_complete(event)  # 通過run_until_complete方法直接執行event,該方法會一直等待直到event執行完畢
loop.close()  # 結束迴圈


2.關於task物件的操作
(1)建立任務物件並列印其狀態
# -*- coding: utf-8 -*-
# @Time    : 2022/11/24 17:32
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例2.py
# @Software: PyCharm

import asyncio
import time


async def async_function():  # async修飾的非同步函數,在該函數中可以新增await進行暫停並切換到其他非同步函數中
    now_time = time.time()
    await asyncio.sleep(1)  # 當執行await future這行程式碼時(future物件就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    print('花費時間:{}秒'.format(time.time()-now_time))

event = async_function()  # 建立協程事件物件

loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件迴圈物件
task = loop.create_task(event)  # 建立任務物件
print(task)  # 任務執行中task
loop.run_until_complete(task)  # 等待task執行完畢
print(task)  # 任務執行結束task狀態
loop.close()  # 結束迴圈


執行中:狀態顯示為running
執行結束後:狀態顯示done,result為協程函數返回值,因為此函數無返回值所以為None

(2)獲取task返回值
  • 方法一:通過task.result()的方法獲取返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 10:40
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例3.py
# @Software: PyCharm

import asyncio
import time


async def async_function():  # async修飾的非同步函數,在該函數中可以新增await進行暫停並切換到其他非同步函數中
    now_time = time.time()
    await asyncio.sleep(1)  # 當執行await future這行程式碼時(future物件就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    return '花費時間:{}秒'.format(time.time() - now_time)  # 將列印語句換成返回值


event = async_function()  # 建立協程事件物件

loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件迴圈物件
task = loop.create_task(event)  # 建立任務物件
print(task)  # 任務執行中task
try:
    print(task.result())  # 任務未完成列印result會丟擲InvalidStateError錯誤
except asyncio.InvalidStateError as r:
    print(r)  # InvalidStateError報錯資訊
loop.run_until_complete(task)  # 等待task執行完畢
print(task)  # 任務執行結束task狀態
print(task.result())  # 列印出task的返回值
loop.close()  # 結束迴圈

  • 方法二:通過add_done_callback()新增完成回撥
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 11:15
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例4.py
# @Software: PyCharm

import asyncio
import time


def task_callback(future):  # 回撥函數獲取任務完成後的返回值
    print(future.result())

async def async_function():  # async修飾的非同步函數,在該函數中可以新增await進行暫停並切換到其他非同步函數中
    now_time = time.time()
    await asyncio.sleep(1)  # 當執行await future這行程式碼時(future物件就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    return '花費時間:{}秒'.format(time.time() - now_time)  # 將列印語句換成返回值


event = async_function()  # 建立協程事件物件
loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件迴圈物件
task = loop.create_task(event)  # 建立任務物件

task.add_done_callback(task_callback)  # 為而任務新增回撥函數

loop.run_until_complete(task)  # 等待task執行完畢
loop.close()  # 結束迴圈


通過 Future 的 add_done_callback() 方法來新增回撥函數,當任務完成後,程式會自動觸發該回撥函數,並將對應的 Future 物件作為引數傳給該回撥函數。
PS:Function 'add_done_callback' doesn't return anything(函數「add_done_callback」不返回任何內容)

3.多工tasks的實現
(1)通過asyncio.wait()來控制多工
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 14:12
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例5.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # async修飾的非同步函數,在該函數中可以新增await進行暫停並切換到其他非同步函數中
    await asyncio.sleep(num)  # 當執行await future這行程式碼時(future物件就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    print('協程花費時間:{}秒'.format(time.time() - now_time))  

now_time = time.time()  # 程式執行時的時間戳
events = [async_function(num=num) for num in range(1, 4)]  # 建立協程事件列表
loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件迴圈物件
tasks = asyncio.wait(events)  # 通過asyncio.wait(events)建立多工物件


loop.run_until_complete(tasks)  # 等待task執行完畢
loop.close()  # 結束迴圈
print('總執行花費時常:{}秒'.format(time.time() - now_time))

(2)多工獲取返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 15:38
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例6.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # async修飾的非同步函數,在該函數中可以新增await進行暫停並切換到其他非同步函數中
    await asyncio.sleep(num)  # 當執行await future這行程式碼時(future物件就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    return '協程花費時間:{}秒'.format(time.time() - now_time)


now_time = time.time()  # 程式執行時的時間戳
loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件迴圈物件
tasks = [loop.create_task(async_function(num=num)) for num in range(1, 4)]  # 通過事件迴圈的create_task方法建立任務列表
events = asyncio.wait(tasks)  # 通過asyncio.wait(tasks)將任務收集起來

loop.run_until_complete(events)  # 等待events執行完畢
for task in tasks:  # 遍歷迴圈列表,將對應任務返回值列印出來
    print(task.result())
loop.close()  # 結束迴圈

print('總執行花費時常:{}秒'.format(time.time() - now_time))

(3)通過add_done_callback()新增回撥
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 15:58
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例7.py
# @Software: PyCharm
import asyncio
import time


def task_callback(future):  # 回撥函數獲取任務完成後的返回值
    print(future.result())


async def async_function(num):  # async修飾的非同步函數,在該函數中可以新增await進行暫停並切換到其他非同步函數中
    await asyncio.sleep(num)  # 當執行await future這行程式碼時(future物件就是被await修飾的函數),首先future檢查它自身是否已經完成,如果沒有完成,掛起自身,告知當前的Task(任務)等待future完成。
    return '協程花費時間:{}秒'.format(time.time() - now_time)


now_time = time.time()  # 程式執行時的時間戳
loop = asyncio.get_event_loop()  # 通過get_event_loop方法獲取事件迴圈物件
tasks = []  # 任務收集列表(PS:就像髒衣服堆)
for num in range(1, 4):
    task = loop.create_task(async_function(num=num))  # 建立單個任務(單件髒衣服)
    task.add_done_callback(task_callback)  # 為每個任務新增對應的回撥函數
    tasks.append(task)
events = asyncio.wait(tasks)  # 通過asyncio.wait(tasks)將任務收集起來PS:想象成裝髒衣服的籃子

loop.run_until_complete(events)  # 等待events執行完畢

loop.close()  # 結束迴圈

print('紅後總執行花費時長:{}秒'.format(time.time() - now_time))

4.動態不停新增任務task實現

除了像上面第第三點那種設定迴圈一口氣執行的(就像把髒衣服一口氣塞進洗衣機),還可以一個一個執行(把髒衣服一件一件放進去)。
方法:另外建立一條執行緒,在其中建立一個一直迴圈的事件迴圈。(PS:換個大地方放下一臺能夠一直執行的洗衣機,就可以把髒衣服一件一件丟進去了)

(1)同步狀態下
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 14:22
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例8.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread


def thread_new_loop(loop):  # 建立執行緒版洗衣機
    asyncio.set_event_loop(loop)  # 線上程中呼叫loop需要使用set_event_loop方法指定loop
    loop.run_forever()  #  run_forever() 會永遠阻塞當前執行緒,直到有人停止了該loop為止。


def function(num):  # 同步執行的任務方法
    print('任務{}花費時間:{}秒'.format(num, time.time() - now_time))
    return '任務{}完成時間:{}秒'.format(num, time.time() - now_time)


now_time = time.time()  # 程式執行時的時間戳
new_loop = asyncio.new_event_loop()  # 建立一個新的loop,get_event_loop()只會在主執行緒建立新的event loop,其他執行緒中呼叫 get_event_loop() 則會報錯
t = Thread(target=thread_new_loop, args=(new_loop,))  # 建立執行緒
t.start()  # 啟動執行緒
even = new_loop.call_soon_threadsafe(function, 1)  # 呼叫call_soon_threadsafe實現回撥(詳細描述往下找)
even.cancel()  # 當call_soon_threadsafe物件執行cancel()方法就會取消該任務事件(當速度夠快有概率取消前已經執行)
new_loop.call_soon_threadsafe(function, 2)
new_loop.call_soon_threadsafe(function, 3)


loop.call_soon():傳入目標函數和引數,可以將目標函數放到事件迴圈loop中,返回值是一個 asyncio.Handle 物件,此物件內只有一個方法為 cancel()方法,用來取消回撥函數。
loop.call_soon_threadsafe() :比上一個多了個threadsafe保護執行緒安全。

(2)非同步狀態下

與同步相比,函數為非同步函數並且通過asyncio.run_coroutine_threadsafe()方法回撥

# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 16:16
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例9.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread


def thread_new_loop(loop):  # 建立執行緒版洗衣機
    asyncio.set_event_loop(loop)  # 線上程中呼叫loop需要使用set_event_loop方法指定loop
    loop.run_forever()  #  run_forever() 會永遠阻塞當前執行緒,直到有人停止了該loop為止。


async def async_function(num):  # 非同步執行的任務方法
    await asyncio.sleep(num)
    print('非同步任務{}花費時間:{}秒'.format(num, time.time() - now_time))
    return '非同步任務{}完成時間:{}秒'.format(num, time.time() - now_time)


now_time = time.time()  # 程式執行時的時間戳
new_loop = asyncio.new_event_loop()  # 建立一個新的loop,get_event_loop()只會在主執行緒建立新的event loop,其他執行緒中呼叫 get_event_loop() 則會報錯
t = Thread(target=thread_new_loop, args=(new_loop,))  # 建立執行緒
t.start()  # 啟動執行緒
even = asyncio.run_coroutine_threadsafe(async_function(1), new_loop)  # 呼叫asyncio.run_coroutine_threadsafe實現回撥
even.cancel()  # 當run_coroutine_threadsafe物件執行cancel()方法就會取消該任務事件(當速度夠快有概率取消前已經執行)
asyncio.run_coroutine_threadsafe(async_function(2), new_loop)
asyncio.run_coroutine_threadsafe(async_function(3), new_loop)
print('紅後主程序執行花費時長:{}秒'.format(time.time() - now_time))



因為使用了loop.run_forever()所以會一直啟用事件迴圈到stop()的呼叫終止。
若要主執行緒退出時子執行緒也退出,可以設定子執行緒為守護執行緒 t.setDaemon(True)需要線上程執行前設定。

3.8以後的(PS:只要簡單使用直接看這個就行)

執行協程的三種基本方式
async.run() 執行協程
async.create_task()建立task
async.gather()獲取返回值

(1)用run()執行協程
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 17:34
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例10.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread


async def async_function(num):  # 非同步執行的任務方法
    await asyncio.sleep(num)
    print('非同步任務{}完成時間:{}秒'.format(num, time.time() - now_time))


now_time = time.time()  # 程式執行時的時間戳
asyncio.run(async_function(1))  # 用asyncio.run直接執行協程引數為協程函數及其引數

(2)用create_task()建立task
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 17:37
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例11.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # 非同步執行的任務方法
    await asyncio.sleep(num)
    print('非同步任務{}完成時間:{}秒'.format(num, time.time() - now_time))


async def main():  # 非同步主函數用於排程其他非同步函數
    tasks = []  # tasks列表用於存放task
    for num in range(1, 4):
        tasks.append(asyncio.create_task(async_function(num)))
    for task in tasks:
        await task


now_time = time.time()  # 程式執行時的時間戳
asyncio.run(main())  # 用asyncio.run直接執行協程引數為協程函數及其引數
print('【紅後】最終執行時間:{}'.format(time.time() - now_time))


PS:必須先通過asyncio.create_task將task建立到event loop中,再通過await等待,如果直接用await等待則會導致非同步變同步

(3)用gather()收集返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/29 9:25
# @Author  : 紅後
# @Email   : [email protected]
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 範例12.py
# @Software: PyCharm
import asyncio
import time


async def async_function(num):  # 非同步執行的任務方法
    await asyncio.sleep(num)
    return '非同步任務{}完成時間:{}秒'.format(num, time.time() - now_time)


async def main():  # 非同步主函數用於排程其他非同步函數
    tasks = []  # tasks列表用於存放task
    for num in range(1, 4):
        tasks.append(asyncio.create_task(async_function(num)))
    response = await asyncio.gather(tasks[0], tasks[1], tasks[2])  # 將task作為引數傳入gather,等非同步任務都結束後返回結果列表
    print(response)

now_time = time.time()  # 程式執行時的時間戳
asyncio.run(main())  # 用asyncio.run直接執行協程引數為協程函數及其引數
print('【紅後】最終執行時間:{}'.format(time.time() - now_time))