C#多執行緒之執行緒高階(下)

2022-11-16 12:02:50

四、Monitor訊號構造

訊號構造本質:一個執行緒阻塞直到收到另一個執行緒發來的通知。

當多執行緒Wait同一物件時,就形成了一個「等待佇列(waiting queue)」,和用於等待獲得鎖的「就緒佇列(ready queue)」不同,每次呼叫Pulse時會釋放隊頭執行緒,它會進入就緒佇列,然後重新獲取鎖。可以把它想象成一個自動停車場,首先你在收費站(等待佇列)排隊驗票,然後在柵欄前(就緒佇列)排隊等待放行。

這個佇列結構天然有序,但是,對於Wait/Pulse應用通常不重要,在這種場景下把它想象成一個等待執行緒的「池(pool)」更好理解,每次呼叫Pulse都會從池中釋放一個等待執行緒。

PulseAll釋放整個等待佇列或者說等待池。收到Pulse的執行緒不會完全同時開始執行,而是有序的執行,因為每個Wait語句都要試圖重新獲取同一把鎖。他們的效果就是,PulseAll將執行緒從等待佇列移到就緒佇列中,讓它們可以繼續有序執行。

使用Wait/Pulse需要注意:

  • Wait / Pulse不能lock塊之外使用,否則會拋異常。
  • Pulse最多釋放一個執行緒,而PulseAll釋放所有執行緒。
  • Wait會立即釋放當前持有的鎖,然後進入阻塞,等待脈衝
  • 收到脈衝會立即嘗試重新獲取鎖,如果在指定時間內重新獲取,則返回true,如果在超過指定時間獲取,則返回false,如果沒有獲取鎖,則一直阻塞不會返回

Wait和Pulse

  1. 定義一個欄位,作為同步物件

    private readonly object _locker = new object();
    
  2. 定義一個或多個欄位,作為阻塞條件

    private bool _ok;
    
  3. 當你希望阻塞的時候

    Monitor.Wait在等待脈衝時,同步物件上的鎖會被釋放,並且進入阻塞狀態,直到收到 _locker上的脈衝,收到脈衝後重新獲取 _locker,如果此時 _locker 已經被別的執行緒佔有,則繼續阻塞,直至_獲取 _locker

    lock (_locker) 
    {
        while (!_ok)
        {
            Monitor.Wait (_locker);
        }
    }
    
  4. 當你希望改變阻塞條件時

    lock (_locker)
    {
        _ok = true;
        Monitor.Pulse(_locker);  // Monitor.PulseAll(_locker);
    }
    

WaitPulse幾乎是萬能的,通過一個bool標識我們就能實現AutoResetEvent/ManualResetEvent的功能,同理使用一個整形欄位,就可以實現CountdownEvent/Semaphore

效能方面,呼叫Pulse花費大概約是在等待控制程式碼上呼叫Set三分之一的時間。但是,使用WaitPulse進行訊號同步,對比事件等待控制程式碼有以下缺點:

  • Wait / Pulse不能跨越應用程式域和程序使用。

  • 必須通過鎖保護所有訊號同步邏輯涉及的變數。

等待超時

呼叫Wait方法時,你可以設定一個超時時間,可以是毫秒或TimeSpan的形式。如果因為超時而放棄了等待,那麼Wait方法就會返回false

public static bool Wait(object obj, TimeSpan timeout)

如果在超時到達時仍然沒有獲得一個脈衝,CLR會主動給它傳送一個虛擬的脈衝(virtual pulse),使其能夠重新獲得鎖,然後繼續執行,就像收到一個真實脈衝一樣。

下面這個例子非常有用,它可以定期的檢查阻塞條件。即使其它執行緒無法按照預期傳送脈衝,例如程式之後被其他人修改,但沒能正確使用Pulse,這樣也可以在一定程度上免疫 bug。因此在複雜的同步設計中可以給所有Wait指定超時時間。

lock (_locker)
  while (/* <blocking-condition> */)
    Monitor.Wait (_locker, /* <timeout> */);

Monitor.Wait的boolean型別返回值其實還可以這麼理解:其返回值意味著是否獲得了一個「真實的脈衝「。

如果」虛擬的脈衝「並不是期待的行為,可以記錄紀錄檔或丟擲異常。

Wait等待一個變數上的脈衝,Pulse對一個變數傳送脈衝。脈衝也是一種訊號形式,相對於事件等待控制程式碼那種鎖存(latching)訊號,脈衝顧名思義是一種非鎖存或者說易失的訊號

雙向訊號與競爭狀態

Monitor.Pulse是一種單向通訊機制:傳送脈衝的執行緒不關心發出的脈衝被誰收到了,他沒有返回值,不會阻塞,內部也沒有確認機制。

當一個執行緒發起一次脈衝:

  • 如果等待佇列中沒有任何執行緒,那麼這次發起的脈衝不會有任何效果。
  • 如果等待佇列中有執行緒,執行緒傳送完脈衝並釋放鎖後,並不能保證接到脈衝訊號的等待執行緒能立即開始工作。

然後我們有一些場景依賴等待執行緒能夠在收到脈衝後及時的響應,此時,雙向訊號出現了,這是一種自定義的確認機制。

在上文的訊號構造基礎上改造一個競爭狀態的案例:

public class 競爭狀態測試
{
    private readonly ITestOutputHelper _testOutputHelper;
    private readonly object _locker = new object();
    private bool _ok;

    public 競爭狀態測試(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    void Show()
    {
        new Thread(() =>  // Worker
        {
            for (int i = 0; i < 5; i++)
                lock (_locker)
                {
                    while (!_ok) Monitor.Wait(_locker);
                    _ok = false;
                    _testOutputHelper.WriteLine("Wassup?");
                }
        }).Start();

        for (int i = 0; i < 5; i++)
        {
            lock (_locker)
            {
                _ok = true;
                Monitor.Pulse(_locker);
            }
        }
    }
}

我們期待的結果:

Wassup?
Wassup?
Wassup?
Wassup?
Wassup?

實際上這個這個程式可能一次」Wassup?「都不會輸出:主執行緒可能在工作執行緒啟動之前完成,這五次Pulse啥事都沒幹

還記得我們講事件等待控制程式碼時,使用AutoResetEvent來模擬的雙向訊號嗎?現在使用Monitor來實現一個擴充套件性更好的版本

public class 雙向訊號測試
{
    private readonly ITestOutputHelper _testOutputHelper;
    private readonly object _locker = new();
    private bool _entry; // 我是否可以工作了
    private bool _ready; // 我是否可以繼續投遞了

    public 雙向訊號測試(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    void Show()
    {
        new Thread(() =>
        {
            Thread.Sleep(100);
            for (int i = 0; i < 5; i++)
            {
                lock (_locker)
                {
                    _ready = true;
                    Monitor.PulseAll(_locker);
                    while (!_entry) Monitor.Wait(_locker);
                    _entry = false;
                    _testOutputHelper.WriteLine("Wassup?");
                }
            }
        }).Start();

        for (int i = 0; i < 5; i++)
        {
            lock (_locker)
            {
                while (!_ready) Monitor.Wait(_locker);
                _ready = false;
                _entry = true;
                Monitor.PulseAll(_locker);
            }
        }
    }
}

我們仍然使用_ready來作為上游脈衝執行緒的自旋條件,使用_entry作為下游等待執行緒的自旋條件。由於我們的邏輯都在lock語句中,即使之後引入了第三個執行緒,我們的邏輯仍然不會出問題,_ready_entry的讀寫總是原子的。

升級生產消費佇列

  1. 這次,我們將允許多個消費者,各自擁有獨立的消費執行緒。使用一個陣列來存放這些執行緒,並且他們接收的不再是string,而是更加靈活的委託:

    private Thread[] _workers;
    private Queue<Action> _queue = new Queue<Action>();
    
  2. 和上次一樣,我們傳遞null來告知消費者執行緒退出:

    foreach (var worker in _workers)
    {
        AddTask(null);
    }
    
  3. 在告知消費執行緒退出後Join這些執行緒,等待未完成的任務被消費:

    foreach (var worker in _workers)
    { 
        worker.Join();
    }
    
  4. 每個工作執行緒會執行一個名為Consume的方法。我們在構造佇列時迴圈建立和啟動這些執行緒:

    _workers = new Thread[workerCount];
    for (int i = 0; i < workerCount; i++)
    {
        _workers[i] = new Thread(Consume);
        _workers[i].Start();
    }
    
  5. 消費Comsume方法,一個工作執行緒從佇列中取出並執行一個專案。我們希望工作執行緒沒什麼事情做的時候,或者說當佇列中沒有任何專案時,它們應該被阻塞。因此,我們的阻塞條件是_queue.Count == 0

    private void Consume()
    {
        while (true)
        {
            Action task;
            lock (_locker)
            {
                while (_queue.Count == 0)
                {
                    Monitor.Wait(_locker);  // 佇列裡沒任務,釋放鎖,進入等待
                }
                // 獲取新任務,重新持有鎖
                task = _queue.Dequeue();
            }
            
            if (task == null) return;  // 空任務代表退出
            task();  // 執行任務
        }
    }
    
  6. 新增一個任務。出於效率考慮,加入一個任務時,我們呼叫Pulse而不是PulseAll。這是因為每個專案只需要喚醒(至多)一個消費者。如果你只有一個冰激凌,你不會把一個班 30 個正在睡覺的孩子都叫起來排隊獲取它。

    public void AddTask(Action task)
    {
        lock (_locker)
        {
            _queue.Enqueue(task);
            Monitor.Pulse(_locker);
        }
    }
    

模擬等待控制程式碼

在雙向訊號中,你可能注意到了一個模式:_flag在當前執行緒被作為自旋阻塞條件,在另一執行緒中被設定為true,跳出自旋

lock(_locker)
{
    while (!_flag) Monitor.Wait(_locker);
	_flag = false;
}

ManualResetEvent

事實上它的工作原理就是模仿AutoResetEvent。如果去掉_flag=false,就得到了ManualResetEvent的基礎版本。

private readonly object _locker = new object();
private bool _signal;
void WaitOne()
{
    lock (_locker)
    {
        while (!_signal) Monitor.Wait(_locker);
    }
}
void Set()
{
    lock (_locker)
    {
        _signal = true;
        Monitor.PulseAll(_locker);
    }
}
void Reset()
{
    lock (_locker) _signal = false;
}

使用PulseAll,是因為可能存在多個被阻塞的等待執行緒。而EventWaitHandle.WaitOne()的通行條件就是:是開著的,ManualResetEvent被放行通過後不會自己關門,只能通過Reset將門關上,再次期間其它所有阻塞執行緒都能通行。

AutoResetEvent

實現AutoResetEvent非常簡單,只需要將WaitOne方法改為:

lock (_locker)
{
    while (!_signal) Monitor.Wait(_locker);
    _signal = false;  // 新增一條,自己關門
}

然後將Set方法改為:

lock (_locker)
{
    _signal = true;
    Monitor.Pulse(_locker);  // PulseAll替換成Pulse:
}

Semaphore

_signal替換為一個整型欄位可以得到Semaphore的基礎版本

public class 模擬號誌
{
    private readonly object _locker = new object();
    private int _count, _initialCount;
    public 模擬號誌(int initialCount)
    {
        _initialCount = initialCount;
    }
    
    void WaitOne()  // +1
    {
        lock (_locker)
        {
            _count++;
            while (_count >= _initialCount)
            {
                Monitor.Wait(_locker);
            }
        }
    }

    void Release()  // -1
    {
        lock (_locker)
        {
            _count --;
            Monitor.Pulse(_locker);
        }
    }
}

模擬CountdownEvent

是不是非常類似號誌?

public class 模擬CountdownEvent
{
    private object _locker = new object();
    private int _initialCount;

    public 模擬CountdownEvent(int initialCount)
    {
        _initialCount = initialCount;
    }

    public void Signal()  // +1
    {
        AddCount(-1);
    }

    public void AddCount(int amount)  // +amount
    {
        lock (_locker)
        {
            _initialCount -= amount;
            if (_initialCount <= 0) Monitor.PulseAll(_locker);
        }
    }

    public void Wait()
    {
        lock (_locker)
        {
            while (_initialCount > 0)
                Monitor.Wait(_locker);
        }
    }
}

執行緒會合

CountdownEvent

利用我們剛剛實現的模擬CountdownEvent,來實現兩個執行緒的會和,和同步基礎中提到的WaitHandle.SignalAndWait一樣。

並且我們也可以通過initialCount將會和的執行緒擴充套件到更多個,顯而易見的強大。

public class 執行緒會和測試
{
    private readonly ITestOutputHelper _testOutputHelper;
    private 模擬CountdownEvent _countdown = new 模擬CountdownEvent(2);

    public 執行緒會和測試(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    public void Show()
    {
        // 每個執行緒都睡眠一段隨機時間
        Random r = new Random();
        new Thread(Mate).Start(r.Next(10000));
        Thread.Sleep(r.Next(10000));

        _countdown.Signal();
        _countdown.Wait();

        _testOutputHelper.WriteLine("Mate! ");
    }

    void Mate(object delay)
    {
        Thread.Sleep((int)delay);

        _countdown.Signal(); //+1
        _countdown.Wait();

        _testOutputHelper.WriteLine("Mate! ");
    }
}

上面例子,每個執行緒隨機休眠一段時間,然後等待對方,他們幾乎在同時列印」Mate!「,這被稱為執行緒執行屏障(thread execution barrier)

當你想讓多個執行緒執行一個系列任務,希望它們步調一致時,可以用到執行緒執行屏障。然而,我們現在的解決方案有一定限制:我們不能重用同一個Countdown物件來第二次會合執行緒,至少在沒有額外訊號構造的情況下不能。為解決這個問題,Framework 4.0 提供了一個新的類Barrier

Barrier

Framework 4.0 加入的一個訊號構造。它實現了執行緒執行屏障(thread execution barrier),允許多個執行緒在一個時間點會合。這個類非常快速和高效,它是建立在Wait / Pulse和自旋鎖基礎上的。

  1. 範例化它,指定有多少個執行緒參與會合(可以呼叫AddParticipants / RemoveParticipants來進行更改)。

    public Barrier(int participantCount)
    
  2. 當希望會合時,呼叫SignalAndWait。表示參與者已到達障礙,並等待所有其他參與者到達障礙

    public void SignalAndWait()
    

    他還實現了共同作業取消模式

    public void SignalAndWait(CancellationToken cancellationToken)
    

    並提供了超時時間的過載,返回一個bool型別,true標識在規定的時間,其他參與者到達障礙,false標識沒有全部到達

    public bool SignalAndWait(TimeSpan timeout)
    

範例化Barrier,引數為 3 ,意思是呼叫SignalAndWait會被阻塞直到該方法被呼叫 3 次。但與CountdownEvent不同,它會自動復位:再呼叫SignalAndWait仍會阻塞直到被呼叫 3 次。這允許你保持多個執行緒「步調一致」,讓它們執行一個系列任務。

下邊的例子中,三個執行緒步調一致地列印數位 0 到 4:

private readonly ITestOutputHelper _testOutputHelper;
private Barrier _barrier = new Barrier(3);
public Barrier測試(ITestOutputHelper testOutputHelper)
{
    _testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
    new Thread(Speak).Start();
    new Thread(Speak).Start();
    new Thread(Speak).Start();
}
void Speak()
{
    for (int i = 0; i < 5; i++)
    {
        _testOutputHelper.WriteLine(i.ToString());
        _barrier.SignalAndWait();
    }
}

Barrier還提供一個非常用有的構造引數,他是一個委託,會在每個會和處執行。不用擔心搶佔,因為當它被執行時,所有的參與者都是被阻塞的。

public Barrier(int participantCount, Action<Barrier>? postPhaseAction)

五、拓展

前景回顧:

還記得我們在講同步的時候提到的最小化共用資料無狀態設計嗎?經過前面的學習,稍加思考,其實引發執行緒安全的本質是多執行緒並行下的資料互動問題。如果我們的資料線上程之間沒有互動,或者說我們的資料都是唯讀的,那不就天然的執行緒安全了嗎?

現在你能理解為什麼唯讀欄位是天然執行緒安全的了嗎?

然而有的場景下又需要對公共資料進行讀寫,同步篇中我們通過很簡單的排它鎖來保證執行緒安全,在這裡,我們不在滿足這種粗暴的粒度(事實上多數時候讀總是多於寫),這時,讀寫鎖出現了。

ReaderWriterLockSlim

ReaderWriterLockSlim在 Framework 3.5 加入的,被加入了standard 1.0,此型別是執行緒安全的,用於保護由多個執行緒讀取的資源。

ReaderWriterLockSlim出現的目的是為了取締ReaderWriterLock,他簡化了遞迴規則以及鎖狀態的升級和降級規則。避免了許多潛在的死鎖情況。 另外,他的效能顯著優於ReaderWriterLock。 建議對所有新開發的專案使用ReaderWriterLockSlim

然而如果與普通的lockMonitor.Enter / Exit)對比,他還是要慢一倍。

ReaderWriterLockSlim有三種模式:

  • 讀取模式:允許任意多的執行緒處於讀取模式

  • 可升級模式:只允許一個執行緒處於可升級模式,與讀鎖相容

  • 寫入模式:完全互斥,不允許任何模式下的執行緒獲取任何鎖

ReaderWriterLockSlim定義瞭如下的方法來獲取和釋放讀 / 寫鎖:

public void EnterReadLock();
public void ExitReadLock();
public void EnterWriteLock();
public void ExitWriteLock();

另外,對應所有EnterXXX的方法,都有相應的TryXXX版本,可以接受一個超時引數,與Monitor.TryEnter類似。

讓我們來看一個案例:

模擬三個讀執行緒,兩個寫執行緒,並行執行

new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Write).Start();
new Thread(Write).Start();

讀方法是這樣的

while (true)
{
    _rw.EnterReadLock();
    foreach (int number in _items)
    {
        Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
        Thread.Sleep(100);
    }
    _rw.ExitReadLock();
}

寫方法是這樣的

while (true)
{
    int number = _rand.Value.Next(100);
    _rw.EnterWriteLock();
    _items.Add(number);
    _rw.ExitWriteLock();
    Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
    Thread.Sleep(100);
}

亂數生成方法就是用的TLS講過的

new ThreadLocal<Random>(() => new Random(Guid.NewGuid().GetHashCode()));

需要注意ReaderWriterLockSlim實現了IDisposable,用完了請記得釋放

public class ReaderWriterLockSlim : IDisposable

執行結果:

Thread 11 added 42
Thread 8 reading 42
Thread 6 reading 42
Thread 7 reading 42
Thread 10 added 98
Thread 8 reading 42
...

顯而易見的,並行度變高了

鎖遞迴

ReaderWriterLockSlim提供一個構造引數LockRecursionPolicy用於設定鎖遞迴策略

public ReaderWriterLockSlim(LockRecursionPolicy recursionPolicy)
public enum LockRecursionPolicy
{
  /// <summary>If a thread tries to enter a lock recursively, an exception is thrown. Some classes may allow certain recursions when this setting is in effect.</summary>
  NoRecursion,
  /// <summary>A thread can enter a lock recursively. Some classes may restrict this capability.</summary>
  SupportsRecursion,
}

預設情況下是使用NoRecursion策略:不允許遞迴或重入,這與GO的讀寫鎖設計不謀而合,建議使用此預設策略,因為遞迴引入了不必要的複雜性,並使程式碼更易於死鎖。

public ReaderWriterLockSlim() : this(LockRecursionPolicy.NoRecursion)

開啟支援遞迴策略後,以下程式碼不會丟擲LockRecursionException異常

var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
rw.EnterReadLock();
rw.EnterReadLock();
rw.ExitReadLock();
rw.ExitReadLock();

遞迴鎖定級別只能越來越小,級別順序如下:讀鎖,可升級鎖,寫鎖。下面程式碼會丟擲LockRecursionException異常

void F()
{
    var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
    rw.EnterReadLock();
    rw.EnterWriteLock();
    rw.EnterWriteLock();
    rw.ExitReadLock();
}
Assert.Throws<LockRecursionException>(F);

可升級鎖例外,把可升級鎖升級為寫鎖是合法的。

var rw = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
rw.EnterUpgradeableReadLock();
rw.EnterWriteLock();
rw.ExitWriteLock();
rw.ExitUpgradeableReadLock();

思考一個問題:為什麼只允許一個執行緒處於可升級模式?

SQL Server ReaderWriterLockSlim
共用鎖(Share lock) 讀鎖(Read lock)
排它鎖(Exclusive lock) 寫鎖(Write lock)
更新鎖(Update lock) 可升級鎖(Upgradeable lock)

Timer

如果你需要使用規律的時間間隔重複執行一些方法,這個例子會使得一個執行緒永遠被佔用

while (true)
{
    // do something
    Thread.Sleep(1000);
}

這時候你會需要Timer

建立計時器時,可以指定在方法首次執行之前等待的時間 dueTime ,以及後續執行之間等待的時間period。 類 Timer 的解析度與系統時鐘相同。 這意味著,如果period小於系統時鐘的解析度,委託將以系統時鐘解析度定義的時間間隔執行,在Windows 7 和Windows 8系統上大約為 15 毫秒。

public Timer(TimerCallback callback, object? state, int dueTime, int period)

下面這個例子首次間隔1s,之後間隔500ms列印tick...

Timer timer = new Timer ((data) =>
{
    _testOutputHelper.WriteLine(data.ToString());
}, "tick...", 1000, 500);
Thread.Sleep(3000);
timer.Dispose();

計時器委託是在構造計時器時指定的,不能更改。 該方法不會在建立計時器的執行緒上執行;而是在執行緒池(thread pool)執行。

如果計時器間隔period小於執行回撥所需的時間,或者如果所有執行緒池執行緒都在使用,並且回撥被多次排隊,則可以在兩個執行緒池執行緒上同時執行回撥。

只要使用 Timer,就必須保留對它的參照。 與任何託管物件一樣,當沒有對其參照時,會受到垃圾回收的約束。 即使 Timer 仍然處於活動狀態也不會阻止它被收集。

不再需要計時器時,請呼叫 Dispose 釋放計時器持有的資源。請注意,呼叫 Dispose() 後仍然可能會發生回撥,因為計時器將回撥排隊供執行緒池執行緒執行。可以使用public bool Dispose(WaitHandle notifyObject)過載等待所有回撥完成。

System.Threading.Timer是一個普通計時器。 它會回撥一個執行緒池執行緒(來自工作池)。

System.Timers.Timer是一個System.ComponentModel.Component ,它包裝System.Threading.Timer ,並提供一些用於在特定執行緒上排程的附加功能。