Python Queue佇列實現執行緒通訊

2020-07-16 10:04:45
queue 模組下提供了幾個阻塞佇列,這些佇列主要用於實現執行緒通訊。在 queue 模組下主要提供了三個類,分別代表三種佇列,它們的主要區別就在於進佇列、出佇列的不同。

關於這三個佇列類的簡單介紹如下:
  1. queue.Queue(maxsize=0):代表 FIFO(先進先出)的常規佇列,maxsize 可以限制佇列的大小。如果佇列的大小達到佇列的上限,就會加鎖,再次加入元素時就會被阻塞,直到佇列中的元素被消費。如果將 maxsize 設定為 0 或負數,則該佇列的大小就是無限制的。
  2. queue.LifoQueue(maxsize=0):代表 LIFO(後進先出)的佇列,與 Queue 的區別就是出佇列的順序不同。
  3. PriorityQueue(maxsize=0):代表優先順序佇列,優先順序最小的元素先出佇列。

這三個佇列類的屬性和方法基本相同, 它們都提供了如下屬性和方法:
  • Queue.qsize():返回佇列的實際大小,也就是該佇列中包含幾個元素。
  • Queue.empty():判斷佇列是否為空。
  • Queue.full():判斷佇列是否已滿。
  • Queue.put(item, block=True, timeout=None):向佇列中放入元素。如果佇列己滿,且 block 引數為 True(阻塞),當前執行緒被阻塞,timeout 指定阻塞時間,如果將 timeout 設定為 None,則代表一直阻塞,直到該佇列的元素被消費;如果佇列己滿,且 block 引數為 False(不阻塞),則直接引發 queue.FULL 異常。
  • Queue.put_nowait(item):向佇列中放入元素,不阻塞。相當於在上一個方法中將 block 引數設定為 False。
  • Queue.get(item, block=True, timeout=None):從佇列中取出元素(消費元素)。如果佇列已滿,且 block 引數為 True(阻塞),當前執行緒被阻塞,timeout 指定阻塞時間,如果將 timeout 設定為 None,則代表一直阻塞,直到有元素被放入佇列中; 如果佇列己空,且 block 引數為 False(不阻塞),則直接引發 queue.EMPTY 異常。
  • Queue.get_nowait(item):從佇列中取出元素,不阻塞。相當於在上一個方法中將 block 引數設定為 False。

下面以普通的 Queue 為例介紹阻塞佇列的功能和用法。首先用一個最簡單的程式來測試 Queue 的 put() 和 get() 方法。
import queue

# 定義一個長度為2的阻塞佇列
bq = queue.Queue(2)
bq.put("Python")
bq.put("Python")
print("1111111111")
bq.put("Python")  # ① 阻塞執行緒
print("2222222222")
上面程式先定義了一個大小為 2 的 Queue,程式先向該佇列中放入兩個元素,此時佇列還沒有滿,兩個元素都可以被放入。當程式試圖放入第三個元素時,如果使用 put() 方法嘗試放入元素將會阻塞執行緒,如上面程式中 ① 號程式碼所示。

與此類似的是,在 Queue 已空的情況下,程式使用 get() 方法嘗試取出元素將會阻塞執行緒。

在掌握了 Queue 阻塞佇列的特性之後,在下面程式中就可以利用 Queue 來實現執行緒通訊了。
import threading
import time
import queue

def product(bq):
    str_tuple = ("Python", "Kotlin", "Swift")
    for i in range(99999):
        print(threading.current_thread().name + "生產者準備生產元組元素!")
        time.sleep(0.2);
        # 嘗試放入元素,如果佇列已滿,則執行緒被阻塞
        bq.put(str_tuple[i % 3])
        print(threading.current_thread().name 
            + "生產者生產元組元素完成!")
def consume(bq):
    while True:
        print(threading.current_thread().name + "消費者準備消費元組元素!")
        time.sleep(0.2)
        # 嘗試取出元素,如果佇列已空,則執行緒被阻塞
        t = bq.get()
        print(threading.current_thread().name 
            + "消費者消費[ %s ]元素完成!" % t)
# 建立一個容量為1的Queue
bq = queue.Queue(maxsize=1)
# 啟動3個生產者執行緒
threading.Thread(target=product, args=(bq, )).start()
threading.Thread(target=product, args=(bq, )).start()
threading.Thread(target=product, args=(bq, )).start()
# 啟動一個消費者執行緒
threading.Thread(target=consume, args=(bq, )).start()
上面程式啟動了三個生產者執行緒向 Queue 佇列中放入元素,啟動了三個消費者執行緒從 Queue 佇列中取出元素。本程式中 Queue 佇列的大小為 1,因此三個生產者執行緒無法連續放入元素,必須等待消費者執行緒取出一個元素後,其中的一個生產者執行緒才能放入一個元素。

執行該程式,將會看到如圖 1 所示的結果。


圖1 使用 Queue 控制執行緒通訊