執行緒通訊


在現實生活中,如果一個人團隊正在共同完成任務,那麼他們之間應該有通訊,以便正確完成任務。 同樣的比喻也適用於執行緒。 在程式設計中,要減少處理器的理想時間,我們建立了多個執行緒,並為每個執行緒分配不同的子任務。 因此,必須有一個通訊設施,他們應該互相溝通交流,以同步的方式完成工作。

考慮以下與執行緒通訊相關的重要問題 -

  • 沒有效能增益 - 如果無法線上程和進程之間實現適當的通訊,那麼並行性和並行性的效能收益是沒有用的。
  • 完成任務 - 如果執行緒之間沒有適當的相互通訊機制,分配的任務將無法正常完成。
  • 比進程間通訊更高效 - 執行緒間通訊比進程間通訊更高效且更易於使用,因為進程內的所有執行緒共用相同的地址空間,並且不需要使用共用記憶體。

執行緒安全通訊的Python資料結構

多執行緒程式碼出現了將資訊從一個執行緒傳遞到另一個執行緒的問題。 標準的通訊原語不能解決這個問題。 因此,需要實現我們自己的組合物件,以便線上程之間共用物件以使通訊執行緒安全。 以下是一些資料結構,它們在進行一些更改後提供執行緒安全通訊 -

1. Set

為了以執行緒安全的方式使用set資料結構,需要擴充套件set類來實現我們自己的鎖定機制。

以下是一個擴充套件類的Python範例 -

class extend_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(extend_class, self).__init__(*args, **kwargs)

   def add(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).add(elem)
      finally:
      self._lock.release()

   def delete(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).delete(elem)
      finally:
      self._lock.release()

在上面的例子中,定義了一個名為extend_class的類物件,它繼承自Python集合類。 在這個類別建構函式中建立一個鎖物件。 現在有兩個函式 - add()delete()。 這些函式被定義並且是執行緒安全的。 它們都依賴於超類功能以及一個鍵異常。

修飾器
這是執行緒安全通訊的另一個關鍵方法是使用裝飾器。

範例

考慮一個Python範例,展示如何使用裝飾器和mminus;

def lock_decorator(method):

   def new_deco_method(self, *args, **kwargs):
      with self._lock:
         return method(self, *args, **kwargs)
return new_deco_method

class Decorator_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(Decorator_class, self).__init__(*args, **kwargs)

   @lock_decorator
   def add(self, *args, **kwargs):
      return super(Decorator_class, self).add(elem)
   @lock_decorator
   def delete(self, *args, **kwargs):
      return super(Decorator_class, self).delete(elem)

在上面的例子中,已經定義了一個名為lock_decorator的裝飾器方法,該方法從Python方法類繼承。 然後在這個類別建構函式中建立一個鎖物件。 現在有兩個函式 - add()delete()。 這些函式被定義並且是執行緒安全的。 他們都依靠超類功能和一個鍵異常。

2. list

列表資料結構對於臨時記憶體儲存而言是執行緒安全,快速以及簡單的結構。在Cpython中,GIL可以防止對它們的並行存取。當我們知道列表是執行緒安全的,但是資料在哪裡呢。實際上,該列表的資料不受保護。例如,如果另一個執行緒試圖做同樣的事情,則L.append(x)不保證能夠返回預期的結果。這是因為儘管append()是一個原子操作並且是執行緒安全的,但另一個執行緒試圖以併行方式修改列表資料,因此可以看到競爭條件對輸出的副作用。

為了解決這類問題並安全地修改資料,我們必須實現一個適當的鎖定機制,這進一步確保多個執行緒不會潛在競爭條件。為了實現適當的鎖定機制,可以像前面的例子那樣擴充套件這個類。

列表上的其他一些原子操作如下所示 -

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

這裡 -

  • LL1L2都是列表
  • DD1D2是字典
  • xy是物件
  • ij是整數

3. 佇列

如果清單資料不受保護,我們可能不得不面對後果。 可能會得到或刪除錯誤的資料項,競爭條件。 這就是為什麼建議使用佇列資料結構的原因。 一個真實世界的排隊範例可以是單車道單向道路,車輛首先進入,首先退出。 售票視窗和公共汽車站的佇列中可以看到更多真實世界的例子。

佇列是預設的執行緒安全資料結構,我們不必擔心實現複雜的鎖定機制。 Python提供了應用程式中使用不同型別佇列的模組。

佇列型別

在本節中,我們將獲得關於不同型別的佇列的資訊。 Python提供了三種從queue模組使用的佇列選項 -

  • 正常佇列(FIFO,先進先出)
  • 後進先出,後進先出
  • 優先順序

我們將在隨後的章節中了解不同的佇列。

正常佇列(FIFO,先進先出)
它是Python提供的最常用的佇列實現。 在這種先排隊的機制中,首先得到服務。 FIFO也被稱為正常佇列。 FIFO佇列可以表示如下 -

FIFO佇列的Python實現

在python中,FIFO佇列可以用單執行緒和多執行緒來實現。

具有單執行緒的FIFO佇列
要實現單執行緒的FIFO佇列,Queue類將實現一個基本的先進先出容器。 使用put()將元素新增到序列的一個「結尾」,並使用get()從另一端移除元素。

範例

以下是用單執行緒實現FIFO佇列的Python程式 -

import queue

q = queue.Queue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end = " ")

執行上面範例程式碼,得到以下結果 -

item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7

輸出結果顯示上面的程式使用單個執行緒來說明這些元素將按照它們插入的順序從佇列中移除。

具有多個執行緒的FIFO佇列

為了實現多執行緒的FIFO,需要從queue模組擴充套件來定義myqueue()函式。 get()put()方法的工作方式與上面討論的一樣,只用單執行緒實現FIFO佇列。 然後為了使它成為多執行緒,我們需要宣告和範例化執行緒。 這些執行緒將以FIFO方式使用佇列。

範例
以下是用於實現具有多個執行緒的FIFO佇列的Python程式 -

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
   item = queue.get()
   if item is None:
   break
   print("{} removed {} from the queue".format(threading.current_thread(), item))
   queue.task_done()
   time.sleep(2)
q = queue.Queue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

執行上面範例程式碼,得到以下結果 -

<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue

4. LIFO,後進先出佇列

佇列使用與FIFO(先進先出)佇列完全相反的類比。 在這個佇列機制中,最後一個將首先獲得服務。 這與實現堆疊資料結構相似。 LIFO佇列在實施深度優先搜尋時非常有用,如人工智慧演算法。

LIFO佇列的Python實現
在python中,LIFO佇列可以用單執行緒和多執行緒來實現。

單執行緒的LIFO佇列
要用單執行緒實現LIFO佇列,Queue類將使用結構Queue.LifoQueue來實現基本的後進先出容器。 現在,在呼叫put()時,將元素新增到容器的頭部,並使用get()從頭部移除。

範例
以下是用單執行緒實現LIFO佇列的Python程式 -

import queue

q = queue.LifoQueue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end=" ")

執行上面範例程式碼,得到以下結果 -

item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0

輸出顯示上述程式使用單個執行緒來說明元素將以插入的相反順序從佇列中移除。

帶有多個執行緒的LIFO佇列

這個實現與使用多執行緒實現FIFO佇列相似。 唯一的區別是需要使用Queue類,該類將使用結構Queue.LifoQueue來實現基本的後進先出容器。

範例
以下是用於實現具有多個執行緒的LIFO佇列的Python程式 -

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

執行上面範例程式碼,得到以下結果 -

<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue

優先佇列

在FIFO和LIFO佇列中,專案順序與插入順序有關。 但是,有很多情況下優先順序比插入順序更重要。 讓我們考慮一個真實世界的例子。 假設機場的安保人員正在檢查不同類別的人員。 VVIP的人員,航空公司工作人員,海關人員,類別可能會優先檢查,而不是像到平民那樣根據到達情況進行檢查。

需要考慮優先佇列的另一個重要方面是如何開發任務排程器。 一種常見的設計是在佇列中優先處理最具代理性的任務。 該資料結構可用於根據佇列的優先順序值從佇列中提取專案。

優先佇列的Python實現
在python中,優先順序佇列可以用單執行緒和多執行緒來實現。

單執行緒優先佇列

要實現單執行緒優先佇列,Queue類將使用結構Queue.PriorityQueue在優先順序容器上實現任務。 現在,在呼叫put()時,元素將新增一個值,其中最低值將具有最高優先順序,並因此使用get()首先檢索。

範例

考慮下面的Python程式來實現單執行緒的優先順序佇列 -

import queue as Q
p_queue = Q.PriorityQueue()

p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))

while not p_queue.empty():
   item = p_queue.get()
   print('%s - %s' % item)

執行上面範例程式碼,得到以下結果 -

1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important

在上面的輸出中,可以看到佇列已經儲存了基於優先順序的專案 - 較少的值具有高優先順序。

具有多執行緒的優先佇列

該實現類似於具有多個執行緒的FIFO和LIFO佇列的實現。 唯一的區別是需要使用Queue類通過使用結構Queue.PriorityQueue來初始化優先順序。 另一個區別是佇列的生成方式。 在下面給出的例子中,它將生成兩個相同的資料集。

範例
以下Python程式有助於實現具有多個執行緒的優先順序佇列 -

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
   q.put(i,1)

for i in range(5):
   q.put(i,1)

threads = []
for i in range(2):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

執行上面範例程式碼,得到以下結果 -

<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue