C#多執行緒學習(三) 生產者和消費者

2023-04-21 12:01:38

C#多執行緒學習(三) 生產者和消費者

執行緒學習第一篇:C#多執行緒學習(一) 多執行緒的相關概念
執行緒學習第二篇:C#多執行緒學習(二) 如何操縱一個執行緒

前面說過,每個執行緒都有自己的資源,但是程式碼區是共用的,即每個執行緒都可以執行相同的函數。這可能帶來的問題就是幾個執行緒同時執行一個函數,導致資料的混亂,產生不可預料的結果,因此我們必須避免這種情況的發生。

C#提供了一個關鍵字lock,它可以把一段程式碼定義為互斥段(critical section),互斥段在一個時刻內只允許一個執行緒進入執行,而其他執行緒必須等待。在C#中,關鍵字lock定義如下:

lock(expression) statement_block

expression代表你希望跟蹤的物件,通常是物件參照。
如果你想保護一個類的範例,一般地,你可以使用this;
如果你想保護一個靜態變數(如互斥程式碼段在一個靜態方法內部),一般使用類名就可以了。

statement_block就是互斥段的程式碼,這段程式碼在一個時刻內只可能被一個執行緒執行。

下面是一個使用lock關鍵字的典型例子,在註釋裡說明了lock關鍵字的用法和用途。

範例如下:

using System;
using System.Threading;
namespace ThreadSimple
{
    internal class Account
    {
        int balance;
        Random r = new Random();
        internal Account(int initial)
        {
            balance = initial;
        }
        internal int Withdraw(int amount)
        {
            if (balance < 0)
            {
                //如果balance小於 0 則丟擲異常
                throw new Exception("Negative Balance");
            }
            //下面的程式碼保證在當前執行緒修改balance的值完成之前
            //不會有其他執行緒也執行這段程式碼來修改balance的值
            //因此,balance的值是不可能小於0的
            lock (this)
            {
                Console.WriteLine("Current Thread:"+Thread.CurrentThread.Name);
                //如果沒有lock關鍵字的保護,那麼可能在執行完if的條件判斷之後
                //另外一個執行緒卻執行了balance=balance-amount修改了balance的值
                //而這個修改對這個執行緒是不可見的,所以可能導致這時if的條件已經不成立了
                //但是,這個執行緒卻繼續執行balance=balance-amount,所以導致balance可能小於0
                if (balance >= amount)
                {
                    Thread.Sleep(5);
                    balance = balance - amount;
                    return amount;
                }
                else
                {
                    return 0;// transaction rejected
                }
            }
        }
        internal void DoTransactions()
        {
            for (int i = 0; i < 100; i++)
            Withdraw(r.Next(-50,100));
        }
    }
    internal class Test
    {
        static internal Thread[] threads = new Thread[10];
        public static void Main()
        {
            Account acc = new Account(0);
            for (int i = 0; i < 10; i++)
            {
                Thread t = new Thread(new ThreadStart(acc.DoTransactions));
                threads[i] = t;
            }
            for (int i = 0; i < 10; i++)
                threads[i].Name=i.ToString();
            for (int i = 0; i < 10; i++)
                threads[i].Start();
            Console.ReadLine();
        }
    }
}

Monitor 類鎖定一個物件

當多執行緒公用一個物件時,也會出現和公用程式碼類似的問題,這種問題就不應該使用lock關鍵字了,這裡需要用到System.Threading中的一個類Monitor,我們可以稱之為監視器,Monitor提供了使執行緒共用資源的方案。

Monitor類可以鎖定一個物件,一個執行緒只有得到這把鎖才可以對該物件進行操作。物件鎖機制保證了在可能引起混亂的情況下一個時刻只有一個執行緒可以存取這個物件。

Monitor必須和一個具體的物件相關聯,但是由於它是一個靜態的類,所以不能使用它來定義物件,而且它的所有方法都是靜態的,不能使用物件來參照。下面程式碼說明了使用Monitor鎖定一個物件的情形:

......
Queue oQueue = new Queue();
......
Monitor.Enter(oQueue);
......//現在oQueue物件只能被當前執行緒操縱了
Monitor.Exit(oQueue);//釋放鎖

如上所示,當一個執行緒呼叫Monitor.Enter()方法鎖定一個物件時,這個物件就歸它所有了,其它執行緒想要存取這個物件,只有等待它使用Monitor.Exit()方法釋放鎖。為了保證執行緒最終都能釋放鎖,你可以把Monitor.Exit()方法寫在try-catch-finally結構中的finally程式碼塊裡。

對於任何一個被Monitor鎖定的物件,記憶體中都儲存著與它相關的一些資訊:
其一是現在持有鎖的執行緒的參照;
其二是一個預備佇列,佇列中儲存了已經準備好獲取鎖的執行緒;
其三是一個等待佇列,佇列中儲存著當前正在等待這個物件狀態改變的佇列的參照。

當擁有物件鎖的執行緒準備釋放鎖時,它使用Monitor.Pulse()方法通知等待佇列中的第一個執行緒,於是該執行緒被轉移到預備佇列中,當物件鎖被釋放時,在預備佇列中的執行緒可以立即獲得物件鎖。

下面是一個展示如何使用lock關鍵字和Monitor類來實現執行緒的同步和通訊的例子,也是一個典型的生產者與消費者問題。
這個例程中,生產者執行緒和消費者執行緒是交替進行的,生產者寫入一個數,消費者立即讀取並且顯示(註釋中介紹了該程式的精要所在)。

用到的系統名稱空間如下:

using System;
using System.Threading;

首先,定義一個被操作的物件的類Cell,在這個類裡,有兩個方法:ReadFromCell()WriteToCell()。消費者執行緒將呼叫ReadFromCell()讀取cellContents的內容並且顯示出來,生產者程序將呼叫WriteToCell()方法向cellContents寫入資料。

範例如下:

public class Cell
{
    int cellContents;//Cell物件裡邊的內容
    bool readerFlag = false;//狀態標誌,為true時可以讀取,為false則正在寫入
    public int ReadFromCell()
    {
        lock(this)//Lock關鍵字保證了什麼,請大家看前面對lock的介紹
        {
            if (!readerFlag)//如果現在不可讀取
            {
                try
                {
                    //等待WriteToCell方法中呼叫Monitor.Pulse()方法
                    Monitor.Wait(this);
                }
                catch (SynchronizationLockException e)
                {
                    Console.WriteLine(e);
                }
                catch (ThreadInterruptedException e)
                {
                    Console.WriteLine(e);
                }
            }
            Console.WriteLine("Consume: {0}",cellContents);
            readerFlag = false;
            //重置readerFlag標誌,表示消費行為已經完成
            Monitor.Pulse(this);
            //通知WriteToCell()方法(該方法在另外一個執行緒中執行,等待中)
        }
        return cellContents;
    }

    public void WriteToCell(int n)
    {
        lock(this)
        {
            if (readerFlag)
            {
                try
                {
                    Monitor.Wait(this);
                }
                catch (SynchronizationLockException e)
                {
                    //當同步方法(指Monitor類除Enter之外的方法)在非同步的程式碼區被呼叫
                    Console.WriteLine(e);
                }
                catch (ThreadInterruptedException e)
                {
                    //當執行緒在等待狀態的時候中止
                    Console.WriteLine(e);
                }
            }
            cellContents = n;
            Console.WriteLine("Produce: {0}",cellContents);
            readerFlag = true;
            Monitor.Pulse(this);
            //通知另外一個執行緒中正在等待的ReadFromCell()方法
        }
    }
}

下面定義生產者類 CellProd 和消費者類 CellCons ,它們都只有一個方法ThreadRun(),以便在Main()函數中提供給執行緒的ThreadStart代理物件,作為執行緒的入口。

public class CellProd
{
    Cell cell; //被操作的Cell物件
    int quantity = 1; //生產者生產次數,初始化為1
    public CellProd(Cell box, int request)//建構函式
    {
        cell = box;
        quantity = request;
    }
    public void ThreadRun()
    {
        for(int looper = 1; looper<=quantity; looper++)
            cell.WriteToCell(looper); //生產者向操作物件寫入資訊
    }
}
public class CellCons
{
    Cell cell;
    int quantity = 1;
    public CellCons(Cell box, int request)//建構函式
    {
        cell = box;
        quantity = request;
    }
    public void ThreadRun()
    {
        int valReturned;
        for(int looper = 1; looper<=quantity; looper++)
            valReturned=cell.ReadFromCell();//消費者從操作物件中讀取資訊
    }
}

然後在下面這個類MonitorSample的Main()函數中,我們要做的就是建立兩個執行緒分別作為生產者和消費者,使用CellProd.ThreadRun()方法和CellCons.ThreadRun()方法對同一個Cell物件進行操作。

public class MonitorSample
{
    public static void Main(String[] args)
    {
        int result = 0;//一個標誌位,如果是0表示程式沒有出錯,如果是1表明有錯誤發生
        Cell cell = new Cell();
        //下面使用cell初始化CellProd和CellCons兩個類,生產和消費次數均為 20 次
        CellProd prod = new CellProd(cell, 20);
        CellCons cons = new CellCons(cell, 20);
        Thread producer = new Thread(new ThreadStart(prod.ThreadRun));
        Thread consumer = new Thread(new ThreadStart(cons.ThreadRun));
        //生產者執行緒和消費者執行緒都已經被建立,但是沒有開始執行
        try
        {
            producer.Start();
            consumer.Start();
            producer.Join();
            consumer.Join();
            Console.ReadLine();
        }
        catch (ThreadStateException e)
        {
            //當執行緒因為所處狀態的原因而不能執行被請求的操作
            Console.WriteLine(e);
            result = 1;
        }
        catch (ThreadInterruptedException e)
        {
            //當執行緒在等待狀態的時候中止
            Console.WriteLine(e);
            result = 1;
        }
        //儘管Main()函數沒有返回值,但下面這條語句可以向父程序返回執行結果
        Environment.ExitCode = result;
    }
}

在上面的例程中,同步是通過等待Monitor.Pulse()來完成的。首先生產者生產了一個值,而同一時刻消費者處於等待狀態,直到收到生產者的「脈衝(Pulse)」通知它生產已經完成,此後消費者進入消費狀態,而生產者開始等待消費者完成操作後將呼叫Monitor.Pulese()發出的「脈衝」。

它的執行結果很簡單:

Produce: 1
Consume: 1
Produce: 2
Consume: 2
Produce: 3
Consume: 3
...
...
Produce: 20
Consume: 20

事實上,這個簡單的例子已經幫助我們解決了多執行緒應用程式中可能出現的大問題,只要領悟瞭解決執行緒間衝突的基本方法,很容易把它應用到比較複雜的程式中去。