同步執行緒


執行緒同步可以定義為一種方法,借助這種方法,可以確信兩個或更多的並行執行緒不會同時存取被稱為臨界區的程式段。 另一方面,正如我們所知道的那樣,臨界區是共用資源被存取的程式的一部分。 因此,同步是通過同時存取資源來確保兩個或更多執行緒不相互連線的過程。 下圖顯示了四個執行緒同時嘗試存取程式的臨界區。

為了使它更清楚,假設有兩個或更多執行緒試圖同時在列表中新增物件。 這種行為不能導致成功的結局,因為它會拋棄一個或所有的物件,或者它會完全破壞列表的狀態。 這裡同步的作用是每次只有一個執行緒可以存取列表。

執行緒同步的問題

在實現並行程式設計或應用同步基元時,可能會遇到問題。 在本節中,我們將討論兩個主要問題。 問題是 -

  • 死鎖
  • 競爭條件

1. 競爭條件

這是並行程式設計的主要問題之一。 對共用資源的並行存取可能會導致競爭狀態。 競爭條件可以定義為當兩個或更多執行緒可以存取共用資料,然後嘗試同時更改其值時發生的條件。 由此,變數的值可能是不可預知的,並且取決於進程的上下文切換的時間而變化。

範例

考慮這個例子來理解競爭條件的概念 -

第1步 - 在這一步中,需要匯入執行緒模組 -

import threading

第2步 - 現在,定義一個全域性變數,例如x,以及其值為0 -

x = 0

第3步 - 現在,需要定義increment_global()函式,該函式將在此全域性函式中執行x遞增1 -

def increment_global():

   global x
   x += 1

第4步 - 在這一步中,將定義taskofThread()函式,它將呼叫increment_global()函式達指定的次數; 在這個例子中,它是50000次 -

def taskofThread():

   for _ in range(50000):
      increment_global()

第5步 - 現在,定義建立執行緒t1t2main()函式。 兩者都將在start()函式的幫助下啟動,並等待join()函式完成作業。

def main():
   global x
   x = 0

   t1 = threading.Thread(target= taskofThread)
   t2 = threading.Thread(target= taskofThread)

   t1.start()
   t2.start()

   t1.join()
   t2.join()

第6步 - 現在,需要給出範圍,如想要呼叫main()函式的疊代次數。 在這裡,呼叫為5次。

if __name__ == "__main__":
   for i in range(5):
      main()
      print("x = {1} after Iteration {0}".format(i,x))

在下面顯示的輸出中,我們可以看到競爭條件的影響,因為在每次疊代之後x的值預計為100000。但是,值有很大的變化。 這是由於執行緒對共用全域性變數x的並行存取。

x = 100000 after Iteration 0
x = 54034 after Iteration 1
x = 80230 after Iteration 2
x = 93602 after Iteration 3
x = 93289 after Iteration 4

處理使用鎖定的競爭條件

正如我們已經看到上述程式中競爭條件的影響,我們需要一個同步工具,它可以處理多個執行緒之間的競爭條件。 在Python中,threading模組提供了Lock類來處理競爭條件。 此外,Lock類提供了不同的方法,可以通過它幫助處理多個執行緒之間的競爭條件。 方法如下所述 -

acquire()方法
該方法用於獲取,即阻止鎖定。 鎖可以是阻塞或非阻塞取決於以下真或假的值 -

  • 將值設定為True - 如果使用預設引數True呼叫acquire()方法,則執行緒執行將被阻止,直到解鎖鎖。

  • 將值設定為False - 如果acquire()方法使用False呼叫,而False不是預設引數,那麼執行緒執行不會被阻塞,直到它被設定為true,即直到它被鎖定。

release()方法
此方法用於釋放鎖。 以下是與此方法相關的一些重要任務 -

  • 如果鎖定被鎖定,那麼release()方法將解鎖它。 它的工作是如果多個執行緒被阻塞並且等待鎖被解鎖,則只允許一個執行緒繼續。
  • 如果鎖已經解鎖,它將引發一個ThreadError錯誤。

現在,我們可以用鎖類及其方法重寫上述程式,以避免競爭條件。 我們需要使用lock引數定義taskofThread()方法,然後使用acquire()release()方法來阻塞和非阻塞鎖以避免競爭狀況。

範例

以下是用於理解處理競爭條件的鎖概念的python程式範例 -

import threading

x = 0

def increment_global():

   global x
   x += 1

def taskofThread(lock):

   for _ in range(50000):
      lock.acquire()
      increment_global()
      lock.release()

def main():
   global x
   x = 0

   lock = threading.Lock()
   t1 = threading.Thread(target = taskofThread, args = (lock,))
   t2 = threading.Thread(target = taskofThread, args = (lock,))

   t1.start()
   t2.start()

   t1.join()
   t2.join()

if __name__ == "__main__":
   for i in range(5):
      main()
      print("x = {1} after Iteration {0}".format(i,x))

以下輸出顯示競爭條件的影響被忽略; 在每次疊代之後,x的值現在是100000,這與該程式的期望值相同。

x = 100000 after Iteration 0
x = 100000 after Iteration 1
x = 100000 after Iteration 2
x = 100000 after Iteration 3
x = 100000 after Iteration 4

僵局 - 餐飲哲學家的問題

死鎖是設計並行系統時可能遇到的麻煩問題。可以在餐飲哲學家問題的幫助下說明這個問題,如下所示 -

Edsger Dijkstra最初介紹了餐飲哲學家問題,這是著名的並行系統最大問題和死鎖問題之一。

在這個問題中,有五位著名的哲學家坐在圓桌旁,從碗裡吃著一些食物。 五種哲學家可以使用五種叉子來吃他們的食物。 然而,哲學家決定同時使用兩把叉子來吃他們的食物。

現在,哲學家有兩個主要條件。 首先,每個哲學家既可以進食也可以處於思維狀態,其次,他們必須首先獲得叉子,即左邊和右邊的叉子。 當五位哲學家中的每一位設法同時選擇左叉時,問題就出現了。他們都在等待右叉是自由的,但他們在未吃完了食物之前永遠不會放棄他們的叉子,並且永遠不會有右叉。 因此,餐桌上會出現僵局。

並行系統中的死鎖
現在如果我們看到,並行系統也會出現同樣的問題。 上面例子中的叉子是系統資源,每個哲學家都可以表示這個競爭獲取資源的過程。

Python程式的解決方案

通過將哲學家分為兩種型別 - 貪婪的哲學家和慷慨的哲學家,可以找到解決這個問題的方法。 主要是一個貪婪的哲學家會嘗試拿起左邊的叉子,等到左邊的叉出現。 然後,他會等待右叉子在那裡,拿起來,吃,然後把它放下。 另一方面,一個慷慨的哲學家會嘗試拿起左邊的叉子,如果它不在那裡,他會等一段時間再試一次。 如果他們拿到左邊的叉子,他們會嘗試找到右叉子。 如果他們也會得到正確的叉子,那麼他們會吃飯並釋放叉子。 但是,如果他們不能得到右叉子,那麼他們也會釋放左叉子。

範例

以下Python程式將幫助找到解決哲學家就餐問題的方案 -

import threading
import random
import time

class DiningPhilosopher(threading.Thread):

   running = True

   def __init__(self, xname, Leftfork, Rightfork):
       threading.Thread.__init__(self)
       self.name = xname
       self.Leftfork = Leftfork
       self.Rightfork = Rightfork

   def run(self):
       while(self.running):
          time.sleep( random.uniform(3,13))
          print ('%s is hungry.' % self.name)
          self.dine()

   def dine(self):
       fork1, fork2 = self.Leftfork, self.Rightfork

       while self.running:
          fork1.acquire(True)
          locked = fork2.acquire(False)
          if locked: break
              fork1.release()
              print ('%s swaps forks' % self.name)
              fork1, fork2 = fork2, fork1
           else:
              return

       self.dining()
       fork2.release()
       fork1.release()

   def dining(self):
       print ('%s starts eating '% self.name)
       time.sleep(random.uniform(1,10))
       print ('%s finishes eating and now thinking.' % self.name)

def Dining_Philosophers():
   forks = [threading.Lock() for n in range(5)]
   philosopherNames = ('1st','2nd','3rd','4th', '5th')

   philosophers= [DiningPhilosopher(philosopherNames[i], forks[i%5], forks[(i+1)%5]) \
      for i in range(5)]

   random.seed()
   DiningPhilosopher.running = True
   for p in philosophers: p.start()
   time.sleep(30)
   DiningPhilosopher.running = False
   print (" It is finishing.")

Dining_Philosophers()

上面的程式使用了貪婪和慷慨的哲學家的概念。 該程式還使用了threading模組的Lock類的acquire()release()方法。 我們可以在下面的輸出中看到解決方案 -

4th is hungry.
4th starts eating
1st is hungry.
1st starts eating
2nd is hungry.
5th is hungry.
3rd is hungry.
1st finishes eating and now thinking.3rd swaps forks
2nd starts eating
4th finishes eating and now thinking.
3rd swaps forks5th starts eating
5th finishes eating and now thinking.
4th is hungry.
4th starts eating
2nd finishes eating and now thinking.
3rd swaps forks
1st is hungry.
1st starts eating
4th finishes eating and now thinking.
3rd starts eating
5th is hungry.
5th swaps forks
1st finishes eating and now thinking.
5th starts eating
2nd is hungry.
2nd swaps forks
4th is hungry.
5th finishes eating and now thinking.
3rd finishes eating and now thinking.
2nd starts eating 4th starts eating
It is finishing.