python協程asyncio的個人理解

2022-06-26 15:01:47

協程與任務

python語境中,協程 coroutine 的概念有兩個:協程函數、協程物件,協程物件由協程函數建立得到(類似於類範例化得到一個物件).

理解協程,最重要的是瞭解事件迴圈和任務執行的機制,下面是三個原則:

  • 事件迴圈中,不斷迴圈執行各個任務,若一個任務遇到await或者執行完成,則返回控制權給事件迴圈,這時候事件迴圈再去執行下一個任務
  • 事件迴圈同一時刻只會執行一個任務
  • 協程不會被加入事件迴圈的執行日程,只有被註冊為任務之後,事件迴圈才可以通過任務來設定日程以便並行執行協程

基本語法

協程的宣告和執行

使用async def語句定義一個協程函數,但這個函數不可直接執行

async def aaa():
    print('hello')

print(aaa())

# 輸出----------------------------------
<coroutine object aaa at 0x7f4f9a9dfec0>
/root/Project/test01/test2.py:4: RuntimeWarning: coroutine 'aaa' was never awaited
  print(aaa())
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

如何執行一個協程呢,有三種方式:

  1. 使用asyncio.run()函數,可直接執行
import asyncio

async def aaa():
    print('hello')

asyncio.run(aaa())
# 輸出-------------------------
hello
  1. 使用await進行非同步等待

在協程函數中最重要的功能是使用await語法等待另一個協程,這將掛起當前協程,直到另一個協程返回結果。

await的作用:掛起 coroutine 的執行以等待一個 awaitable 物件。 只能在 coroutine function 內部使用。

import asyncio

async def aaa():
    print('hello')

async def main():
    await aaa()

asyncio.run(main())
  1. 使用asyncio.create_task() 函數來建立一個任務,放入事件迴圈中
import asyncio

async def aaa():
    print('hello')

async def main():
    asyncio.create_task(aaa())

asyncio.run(main())

可等待物件

上面說過,協程函數中最重要的功能是使用await語法等待另一個協程,這將掛起當前協程,直到另一個協程返回結果。(重要,重複一遍)

await後面需要跟一個可等待物件(awaitable),有下面三種可等待物件:

  • 協程:包括協程函數和協程物件
  • 任務:通過asyncio.create_task()函數將協程打包為一個任務
  • Futures:特殊的 低層級 可等待物件,表示一個非同步操作的 最終結果

執行asyncio程式

asyncio.run(coro, ***, debug=False)

傳入協程coroutine coro ,建立事件迴圈,執行協程返回結果,並在結束時關閉,應當被用作 asyncio 程式的主入口點。

建立任務

asyncio.create_task(coro, ***, name=None)

將 coro 協程 打包為一個 Task 排入日程準備執行。返回 Task 物件。

休眠

coroutine asyncio.sleep(delay, result=None, ***, loop=None)

阻塞 delay 指定的秒數,該協程總是會掛起當前任務,以允許其他任務執行

機制解析

通過官網的兩段程式碼,來詳細解析一下協程的執行機制。

官方兩個程式碼如下,注意看輸出差異:

程式碼1,通過協程物件來執行

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())				# 1: 建立事件迴圈,傳入入口點main()協程物件,此時生成一個對應的task

輸出為:

started at 17:13:52
hello
world
finished at 17:13:55

程式碼2,通過任務來執行

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

輸出:

started at 17:14:32
hello
world
finished at 17:14:34

注意到執行時間比前一個程式碼快1秒,下面說明為什麼出現這種情況(文字比較多)。

程式碼一的執行邏輯:

asyncio.run(main()) 啟動一個事件迴圈,將入口點main()協程物件傳入,生成一個對應的任務task_main;

事件迴圈執行任務task_main,然後執行第1條程式碼:print(f"started at {time.strftime('%X')}");

接著執行第2條程式碼:await say_after(1, 'hello'),第2條程式碼首先生成一個say_after(1, 'hello')協程物件,同時生成該協程物件對應的task_1;

由於await語法,task_main任務將控制權返回給事件迴圈,同時告訴事件迴圈需要等待task1才能繼續執行;

事件迴圈獲得控制權後,發現此時有兩個任務task_main和task1,同時task_main在等待task1,於是會去執行task1任務;

task1任務將執行第1條程式碼:await asyncio.sleep(1),同樣會生成asyncio.sleep(1)協程物件,以及對應的任務task2,同時因await語法將控制權返回給事件迴圈;

事件迴圈獲得控制權後,發現此時有三個任務task_main、task1、task2,由於task_main、task1都處於等待狀態,於是執行task3;

task3在1秒後執行完成,返回控制權給事件迴圈;

事件迴圈獲得控制權,發現此時有兩個任務task_main和task1,同時task_main在等待task1,於是會去執行task1任務;

task1任務執行第2條程式碼:print('hello'),執行完成後,任務也執行結束,將控制權返回給事件迴圈;

事件迴圈獲得控制權後,發現此時有一個任務task_main,於是接著執行下一條程式碼:await say_after(2, 'world'),繼續重複上述過程,直到這個協程任務結束;

task_main執行最後一條程式碼;

事件迴圈關閉退出;

程式碼二的執行邏輯:

asyncio.run(main()) 啟動一個事件迴圈,將入口點main()協程物件傳入,生成一個對應的任務task_main;

事件迴圈執行任務task_main,然後執行前幾條程式碼,建立兩個任務task1、task2,並註冊到事件迴圈中(此時事件迴圈一共有3個task),隨之執行程式直到await;

第一個await:await task1,這裡會阻塞當前任務task_main並將控制權返回給事件迴圈,事件迴圈獲取控制權,安排執行下一個任務task1;

task1任務開始執行,直至遇到await asyncio.sleep(1),asyncio.sleep(1)協程物件開始非同步執行,同時task1返回控制權給事件迴圈,事件迴圈獲取控制權後安排執行下一個任務task2;

task2任務開始執行,直至遇到await asyncio.sleep(2),asyncio.sleep(2)協程物件開始非同步執行,同時task2返回控制權給事件迴圈,事件迴圈獲取控制權後安排執行下一個任務;

此時3個任務均處於await狀態,事件迴圈保持等待;

1秒後asyncio.sleep(1)執行完成,task1取消阻塞,事件迴圈將安排task1執行,task1執行完成後返回控制權給事件迴圈,此時事件迴圈中一共兩個任務task_main、task2。

此時task2任務處於await狀態,而task_main也取消了阻塞,事件迴圈安排task_main執行,執行一行程式碼後遇到await task2,於是返回控制權給事件迴圈;

此時2個任務均處於await狀態,事件迴圈保持等待;

1秒後asyncio.sleep(2)執行完成,task2取消阻塞,事件迴圈將安排task2執行,task2執行完成後返回控制權給事件迴圈,此時事件迴圈中只剩任務task_main;

於是事件迴圈安排task_main執行,task_main執行完成,asyncio.()函數收到資訊也結束執行,整個程式結束

執行的流程圖示

(任務就緒後,就等待事件迴圈來呼叫了,此時需要await來阻塞主任務task_main,否則控制權一直在task_main手上,導致task_main任務執行完成,run()收到main()執行結束的訊息後,事件迴圈也關閉並結束,程式也將退出)

其實將第2個程式碼中的await task1刪除,只保留await task2,結果中的輸出相同,並消耗相同的總時間。但只保留await task1的話,將沒有task2的輸出;
如果將第2個程式碼中的await task1和await task2都刪除,換成await asyncio.sleep(3),一樣會列印相同輸出,不過總時間會變為3秒;

其中的原因需要理解協程的工作機制(事件迴圈和控制權)