當執行緒在系統中執行時,執行緒的排程具有一定的透明性,通常程式無法準確控制執行緒的輪換執行,如果有需要,Python 可通過執行緒通訊來保證執行緒協調執行。
假設系統中有兩個執行緒,這兩個執行緒分別代表存款者和取錢者,現在假設系統有一種特殊的要求,即要求存款者和取錢者不斷地重復存款、取錢的動作,而且要求每當存款者將錢存入指定賬戶後,取錢者就立即取出該筆錢。不允許存款者連續兩次存錢,也不允許取錢者連續兩次取錢。
為了實現這種功能,可以藉助於 Condition 物件來保持協調。使用 Condition 可以讓那些己經得到 Lock 物件卻無法繼續執行的執行緒釋放 Lock 物件,Condition 物件也可以喚醒其他處於等待狀態的執行緒。
將 Condition 物件與 Lock 物件組合使用,可以為每個物件提供多個等待集(wait-set)。因此,Condition 物件總是需要有對應的 Lock 物件。從 Condition 的構造器
__init__(self, lock=None)
可以看出,程式在建立 Condition 時可通過 lock 引數傳入要繫結的 Lock 物件;如果不指定 lock 引數,在建立 Condition 時它會自動建立一個與之系結的 Lock 物件。
Condition 類提供了如下幾個方法:
-
acquire([timeout])/release():呼叫 Condition 關聯的 Lock 的 acquire() 或 release() 方法。
-
wait([timeout]):導致當前執行緒進入 Condition 的等待池等待通知並釋放鎖,直到其他執行緒呼叫該 Condition 的 notify() 或 notify_all() 方法來喚醒該執行緒。在呼叫該 wait() 方法時可傳入一個 timeout 引數,指定該執行緒最多等待多少秒。
-
notify():喚醒在該 Condition 等待池中的單個執行緒並通知它,收到通知的執行緒將自動呼叫 acquire() 方法嘗試加鎖。如果所有執行緒都在該 Condition 等待池中等待,則會選擇喚醒其中一個執行緒,選擇是任意性的。
-
notify_all():喚醒在該 Condition 等待池中等待的所有執行緒並通知它們。
本例程式中,可以通過一個旗標來標識賬戶中是否已有存款,當旗標為 False 時,表明賬戶中沒有存款,存款者執行緒可以向下執行,當存款者把錢存入賬戶中後,將旗標設為 True,並呼叫 Condition 的 notify() 或 notify_all() 方法來喚醒其他執行緒。
當存款者執行緒進入執行緒體後,如果旗標為 True,就呼叫 Condition 的 wait() 方法讓該執行緒等待。當旗標為 True 時,表明賬戶中已經存入了錢,取錢者執行緒可以向下執行,當取錢者把錢從賬戶中取出後,將旗標設為 False,並呼叫 Condition 的 notify() 或 notify_all() 方法來喚醒其他執行緒;當取錢者執行緒進入執行緒體後,如果旗標為 False,就呼叫 wait() 方法讓該執行緒等待。
本程式為 Account 類提供了 draw() 和 deposit() 兩個方法,分別對應於該賬戶的取錢和存款操作。因為這兩個方法可能需要並行修改 Account 類的 self.balance 成員變數的值,所以它們都使用 Lock 來控制執行緒安全。除此之外,這兩個方法還使用了 Condition 的 wait() 和 notify_all() 來控制執行緒通訊。
import threading
class Account:
# 定義構造器
def __init__(self, account_no, balance):
# 封裝賬戶編號、賬戶餘額的兩個成員變數
self.account_no = account_no
self._balance = balance
self.cond = threading.Condition()
# 定義代表是否已經存錢的旗標
self._flag = False
# 因為賬戶餘額不允許隨便修改,所以只為self._balance提供getter方法
def getBalance(self):
return self._balance
# 提供一個執行緒安全的draw()方法來完成取錢操作
def draw(self, draw_amount):
# 加鎖,相當於呼叫Condition系結的Lock的acquire()
self.cond.acquire()
try:
# 如果self._flag為假,表明賬戶中還沒有人存錢進去,取錢方法阻塞
if not self._flag:
self.cond.wait()
else:
# 執行取錢操作
print(threading.current_thread().name
+ " 取錢:" + str(draw_amount))
self._balance -= draw_amount
print("賬戶餘額為:" + str(self._balance))
# 將標識賬戶是否已有存款的旗標設為False
self._flag = False
# 喚醒其他執行緒
self.cond.notify_all()
# 使用finally塊來釋放鎖
finally:
self.cond.release()
def deposit(self, deposit_amount):
# 加鎖,相當於呼叫Condition系結的Lock的acquire()
self.cond.acquire()
try:
# 如果self._flag為真,表明賬戶中已有人存錢進去,存錢方法阻塞
if self._flag: # ①
self.cond.wait()
else:
# 執行存款操作
print(threading.current_thread().name
+ " 存款:" + str(deposit_amount))
self._balance += deposit_amount
print("賬戶餘額為:" + str(self._balance))
# 將表示賬戶是否已有存款的旗標設為True
self._flag = True
# 喚醒其他執行緒
self.cond.notify_all()
# 使用finally塊來釋放鎖
finally:
self.cond.release()
上面程式使用 Condition 的 wait() 和 notify_all() 方法進行控制,對存款者執行緒而言,當程式進入 deposit() 方法後,如果 self._flag 為 True,則表明賬戶中已有存款,程式呼叫 Condition 的 wait() 方法被阻塞;否則,程式向下執行存款操作,當存款操作執行完成後,系統將 self._flag 設為 True,然後呼叫 notify_all() 來喚醒其他被阻塞的執行緒。如果系統中有存款者執行緒,存款者執行緒也會被喚醒,但該存款者執行緒執行到 ① 號程式碼處時再次進入阻塞狀態,只有執行 draw() 方法的取錢者執行緒才可以向下執行。同理,取錢者執行緒的執行流程也是如此。
程式中的存款者執行緒迴圈 100 次重複存款,而取錢者執行緒則迴圈 100 次重複取錢,存款者執行緒和取錢者執行緒分別呼叫 Account 物件的 deposit()、draw() 方法來實現。主程式可以啟動任意多個“存款”執行緒和“取錢”執行緒,可以看到所有的“取錢”執行緒必須等“存款”執行緒存錢後才可以向下執行,而“存款”執行緒也必須等“取錢”執行緒取錢後才可以向下執行。主程式程式碼如下:
import threading
import Account
# 定義一個函數,模擬重複max次執行取錢操作
def draw_many(account, draw_amount, max):
for i in range(max):
account.draw(draw_amount)
# 定義一個函數,模擬重複max次執行存款操作
def deposit_many(account, deposit_amount, max):
for i in range(max):
account.deposit(deposit_amount)
# 建立一個賬戶
acct = Account.Account("1234567" , 0)
# 建立、並啟動一個“取錢”執行緒
threading.Thread(name="取錢者", target=draw_many,
args=(acct, 800, 100)).start()
# 建立、並啟動一個“存款”執行緒
threading.Thread(name="存款者甲", target=deposit_many,
args=(acct , 800, 100)).start();
threading.Thread(name="存款者乙", target=deposit_many,
args=(acct , 800, 100)).start()
threading.Thread(name="存款者丙", target=deposit_many,
args=(acct , 800, 100)).start()
執行該程式,可以看到存款者執行緒、取錢者執行緒交替執行的情形,每當存款者向賬戶中存入 800 元之後,取錢者執行緒就立即從賬戶中取出這筆錢。存款完成後賬戶餘額總是 800 元,取錢結束後賬戶餘額總是 0 元。
執行該程式,將會看到如圖 1 所示的結果。
圖 1 執行緒通訊的效果