訊號構造本質:一個執行緒阻塞直到收到另一個執行緒發來的通知。
當多執行緒Wait
同一物件時,就形成了一個「等待佇列(waiting queue)」,和用於等待獲得鎖的「就緒佇列(ready queue)」不同,每次呼叫Pulse
時會釋放隊頭執行緒,它會進入就緒佇列,然後重新獲取鎖。可以把它想象成一個自動停車場,首先你在收費站(等待佇列)排隊驗票,然後在柵欄前(就緒佇列)排隊等待放行。
這個佇列結構天然有序,但是,對於Wait/Pulse
應用通常不重要,在這種場景下把它想象成一個等待執行緒的「池(pool)」更好理解,每次呼叫Pulse
都會從池中釋放一個等待執行緒。
PulseAll
釋放整個等待佇列或者說等待池。收到Pulse
的執行緒不會完全同時開始執行,而是有序的執行,因為每個Wait
語句都要試圖重新獲取同一把鎖。他們的效果就是,PulseAll
將執行緒從等待佇列移到就緒佇列中,讓它們可以繼續有序執行。
使用Wait/Pulse
需要注意:
Wait / Pulse
不能lock塊之外使用,否則會拋異常。Pulse
最多釋放一個執行緒,而PulseAll
釋放所有執行緒。Wait
會立即釋放當前持有的鎖,然後進入阻塞,等待脈衝定義一個欄位,作為同步物件
private readonly object _locker = new object();
定義一個或多個欄位,作為阻塞條件
private bool _ok;
當你希望阻塞的時候
Monitor.Wait
在等待脈衝時,同步物件上的鎖會被釋放,並且進入阻塞狀態,直到收到 _locker上的脈衝,收到脈衝後重新獲取 _locker,如果此時 _locker 已經被別的執行緒佔有,則繼續阻塞,直至_獲取 _locker
lock (_locker)
{
while (!_ok)
{
Monitor.Wait (_locker);
}
}
當你希望改變阻塞條件時
lock (_locker)
{
_ok = true;
Monitor.Pulse(_locker); // Monitor.PulseAll(_locker);
}
Wait
和Pulse
幾乎是萬能的,通過一個bool標識我們就能實現AutoResetEvent/ManualResetEvent的功能,同理使用一個整形欄位,就可以實現CountdownEvent/Semaphore
效能方面,呼叫Pulse
花費大概約是在等待控制程式碼上呼叫Set
三分之一的時間。但是,使用Wait
和Pulse
進行訊號同步,對比事件等待控制程式碼有以下缺點:
Wait / Pulse
不能跨越應用程式域和程序使用。
必須通過鎖保護所有訊號同步邏輯涉及的變數。
呼叫Wait
方法時,你可以設定一個超時時間,可以是毫秒或TimeSpan
的形式。如果因為超時而放棄了等待,那麼Wait
方法就會返回false
。
public static bool Wait(object obj, TimeSpan timeout)
如果在超時到達時仍然沒有獲得一個脈衝,CLR會主動給它傳送一個虛擬的脈衝(virtual pulse),使其能夠重新獲得鎖,然後繼續執行,就像收到一個真實脈衝一樣。
下面這個例子非常有用,它可以定期的檢查阻塞條件。即使其它執行緒無法按照預期傳送脈衝,例如程式之後被其他人修改,但沒能正確使用Pulse
,這樣也可以在一定程度上免疫 bug。因此在複雜的同步設計中可以給所有Wait
指定超時時間。
lock (_locker)
while (/* <blocking-condition> */)
Monitor.Wait (_locker, /* <timeout> */);
Monitor.Wait
的boolean型別返回值其實還可以這麼理解:其返回值意味著是否獲得了一個「真實的脈衝「。如果」虛擬的脈衝「並不是期待的行為,可以記錄紀錄檔或丟擲異常。
Wait
等待一個變數上的脈衝,Pulse
對一個變數傳送脈衝。脈衝也是一種訊號形式,相對於事件等待控制程式碼那種鎖存(latching)訊號,脈衝顧名思義是一種非鎖存或者說易失的訊號
Monitor.Pulse
是一種單向通訊機制:傳送脈衝的執行緒不關心發出的脈衝被誰收到了,他沒有返回值,不會阻塞,內部也沒有確認機制。
當一個執行緒發起一次脈衝:
然後我們有一些場景依賴等待執行緒能夠在收到脈衝後及時的響應,此時,雙向訊號出現了,這是一種自定義的確認機制。
在上文的訊號構造基礎上改造一個競爭狀態的案例:
public class 競爭狀態測試
{
private readonly ITestOutputHelper _testOutputHelper;
private readonly object _locker = new object();
private bool _ok;
public 競爭狀態測試(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
new Thread(() => // Worker
{
for (int i = 0; i < 5; i++)
lock (_locker)
{
while (!_ok) Monitor.Wait(_locker);
_ok = false;
_testOutputHelper.WriteLine("Wassup?");
}
}).Start();
for (int i = 0; i < 5; i++)
{
lock (_locker)
{
_ok = true;
Monitor.Pulse(_locker);
}
}
}
}
我們期待的結果:
Wassup?
Wassup?
Wassup?
Wassup?
Wassup?
實際上這個這個程式可能一次」Wassup?「都不會輸出:主執行緒可能在工作執行緒啟動之前完成,這五次Pulse
啥事都沒幹
還記得我們講事件等待控制程式碼時,使用AutoResetEvent
來模擬的雙向訊號嗎?現在使用Monitor來實現一個擴充套件性更好的版本
public class 雙向訊號測試
{
private readonly ITestOutputHelper _testOutputHelper;
private readonly object _locker = new();
private bool _entry; // 我是否可以工作了
private bool _ready; // 我是否可以繼續投遞了
public 雙向訊號測試(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
new Thread(() =>
{
Thread.Sleep(100);
for (int i = 0; i < 5; i++)
{
lock (_locker)
{
_ready = true;
Monitor.PulseAll(_locker);
while (!_entry) Monitor.Wait(_locker);
_entry = false;
_testOutputHelper.WriteLine("Wassup?");
}
}
}).Start();
for (int i = 0; i < 5; i++)
{
lock (_locker)
{
while (!_ready) Monitor.Wait(_locker);
_ready = false;
_entry = true;
Monitor.PulseAll(_locker);
}
}
}
}
我們仍然使用_ready
來作為上游脈衝執行緒的自旋條件,使用_entry
作為下游等待執行緒的自旋條件。由於我們的邏輯都在lock語句中,即使之後引入了第三個執行緒,我們的邏輯仍然不會出問題,_ready
和_entry
的讀寫總是原子的。
這次,我們將允許多個消費者,各自擁有獨立的消費執行緒。使用一個陣列來存放這些執行緒,並且他們接收的不再是string,而是更加靈活的委託:
private Thread[] _workers;
private Queue<Action> _queue = new Queue<Action>();
和上次一樣,我們傳遞null來告知消費者執行緒退出:
foreach (var worker in _workers)
{
AddTask(null);
}
在告知消費執行緒退出後Join
這些執行緒,等待未完成的任務被消費:
foreach (var worker in _workers)
{
worker.Join();
}
每個工作執行緒會執行一個名為Consume
的方法。我們在構造佇列時迴圈建立和啟動這些執行緒:
_workers = new Thread[workerCount];
for (int i = 0; i < workerCount; i++)
{
_workers[i] = new Thread(Consume);
_workers[i].Start();
}
消費Comsume
方法,一個工作執行緒從佇列中取出並執行一個專案。我們希望工作執行緒沒什麼事情做的時候,或者說當佇列中沒有任何專案時,它們應該被阻塞。因此,我們的阻塞條件是_queue.Count == 0
:
private void Consume()
{
while (true)
{
Action task;
lock (_locker)
{
while (_queue.Count == 0)
{
Monitor.Wait(_locker); // 佇列裡沒任務,釋放鎖,進入等待
}
// 獲取新任務,重新持有鎖
task = _queue.Dequeue();
}
if (task == null) return; // 空任務代表退出
task(); // 執行任務
}
}
新增一個任務。出於效率考慮,加入一個任務時,我們呼叫Pulse
而不是PulseAll
。這是因為每個專案只需要喚醒(至多)一個消費者。如果你只有一個冰激凌,你不會把一個班 30 個正在睡覺的孩子都叫起來排隊獲取它。
public void AddTask(Action task)
{
lock (_locker)
{
_queue.Enqueue(task);
Monitor.Pulse(_locker);
}
}
在雙向訊號中,你可能注意到了一個模式:_flag
在當前執行緒被作為自旋阻塞條件,在另一執行緒中被設定為true
,跳出自旋
lock(_locker)
{
while (!_flag) Monitor.Wait(_locker);
_flag = false;
}
事實上它的工作原理就是模仿AutoResetEvent
。如果去掉_flag=false
,就得到了ManualResetEvent
的基礎版本。
private readonly object _locker = new object();
private bool _signal;
void WaitOne()
{
lock (_locker)
{
while (!_signal) Monitor.Wait(_locker);
}
}
void Set()
{
lock (_locker)
{
_signal = true;
Monitor.PulseAll(_locker);
}
}
void Reset()
{
lock (_locker) _signal = false;
}
使用PulseAll
,是因為可能存在多個被阻塞的等待執行緒。而EventWaitHandle.WaitOne()
的通行條件就是:門
是開著的,ManualResetEvent
被放行通過後不會自己關門,只能通過Reset
將門關上,再次期間其它所有阻塞執行緒都能通行。
實現AutoResetEvent
非常簡單,只需要將WaitOne
方法改為:
lock (_locker)
{
while (!_signal) Monitor.Wait(_locker);
_signal = false; // 新增一條,自己關門
}
然後將Set
方法改為:
lock (_locker)
{
_signal = true;
Monitor.Pulse(_locker); // PulseAll替換成Pulse:
}
把_signal
替換為一個整型欄位可以得到Semaphore
的基礎版本
public class 模擬號誌
{
private readonly object _locker = new object();
private int _count, _initialCount;
public 模擬號誌(int initialCount)
{
_initialCount = initialCount;
}
void WaitOne() // +1
{
lock (_locker)
{
_count++;
while (_count >= _initialCount)
{
Monitor.Wait(_locker);
}
}
}
void Release() // -1
{
lock (_locker)
{
_count --;
Monitor.Pulse(_locker);
}
}
}
是不是非常類似號誌?
public class 模擬CountdownEvent
{
private object _locker = new object();
private int _initialCount;
public 模擬CountdownEvent(int initialCount)
{
_initialCount = initialCount;
}
public void Signal() // +1
{
AddCount(-1);
}
public void AddCount(int amount) // +amount
{
lock (_locker)
{
_initialCount -= amount;
if (_initialCount <= 0) Monitor.PulseAll(_locker);
}
}
public void Wait()
{
lock (_locker)
{
while (_initialCount > 0)
Monitor.Wait(_locker);
}
}
}
利用我們剛剛實現的模擬CountdownEvent
,來實現兩個執行緒的會和,和同步基礎中提到的WaitHandle.SignalAndWait
一樣。
並且我們也可以通過initialCount
將會和的執行緒擴充套件到更多個,顯而易見的強大。
public class 執行緒會和測試
{
private readonly ITestOutputHelper _testOutputHelper;
private 模擬CountdownEvent _countdown = new 模擬CountdownEvent(2);
public 執行緒會和測試(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
public void Show()
{
// 每個執行緒都睡眠一段隨機時間
Random r = new Random();
new Thread(Mate).Start(r.Next(10000));
Thread.Sleep(r.Next(10000));
_countdown.Signal();
_countdown.Wait();
_testOutputHelper.WriteLine("Mate! ");
}
void Mate(object delay)
{
Thread.Sleep((int)delay);
_countdown.Signal(); //+1
_countdown.Wait();
_testOutputHelper.WriteLine("Mate! ");
}
}
上面例子,每個執行緒隨機休眠一段時間,然後等待對方,他們幾乎在同時列印」Mate!「,這被稱為執行緒執行屏障(thread execution barrier)
當你想讓多個執行緒執行一個系列任務,希望它們步調一致時,可以用到執行緒執行屏障。然而,我們現在的解決方案有一定限制:我們不能重用同一個Countdown
物件來第二次會合執行緒,至少在沒有額外訊號構造的情況下不能。為解決這個問題,Framework 4.0 提供了一個新的類Barrier
。
Framework 4.0 加入的一個訊號構造。它實現了執行緒執行屏障(thread execution barrier),允許多個執行緒在一個時間點會合。這個類非常快速和高效,它是建立在Wait / Pulse
和自旋鎖基礎上的。
範例化它,指定有多少個執行緒參與會合(可以呼叫AddParticipants / RemoveParticipants
來進行更改)。
public Barrier(int participantCount)
當希望會合時,呼叫SignalAndWait
。表示參與者已到達障礙,並等待所有其他參與者到達障礙
public void SignalAndWait()
他還實現了共同作業取消模式
public void SignalAndWait(CancellationToken cancellationToken)
並提供了超時時間的過載,返回一個bool
型別,true標識在規定的時間,其他參與者到達障礙,false標識沒有全部到達
public bool SignalAndWait(TimeSpan timeout)
範例化Barrier
,引數為 3 ,意思是呼叫SignalAndWait
會被阻塞直到該方法被呼叫 3 次。但與CountdownEvent
不同,它會自動復位:再呼叫SignalAndWait
仍會阻塞直到被呼叫 3 次。這允許你保持多個執行緒「步調一致」,讓它們執行一個系列任務。
下邊的例子中,三個執行緒步調一致地列印數位 0 到 4:
private readonly ITestOutputHelper _testOutputHelper;
private Barrier _barrier = new Barrier(3);
public Barrier測試(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
new Thread(Speak).Start();
new Thread(Speak).Start();
new Thread(Speak).Start();
}
void Speak()
{
for (int i = 0; i < 5; i++)
{
_testOutputHelper.WriteLine(i.ToString());
_barrier.SignalAndWait();
}
}
Barrier
還提供一個非常用有的構造引數,他是一個委託,會在每個會和處執行。不用擔心搶佔,因為當它被執行時,所有的參與者都是被阻塞的。
public Barrier(int participantCount, Action<Barrier>? postPhaseAction)
前景回顧:
還記得我們在講同步的時候提到的最小化共用資料和無狀態設計嗎?經過前面的學習,稍加思考,其實引發執行緒安全的本質是多執行緒並行下的資料互動問題。如果我們的資料線上程之間沒有互動,或者說我們的資料都是唯讀的,那不就天然的執行緒安全了嗎?
現在你能理解為什麼唯讀欄位是天然執行緒安全的了嗎?
然而有的場景下又需要對公共資料進行讀寫,同步篇中我們通過很簡單的排它鎖來保證執行緒安全,在這裡,我們不在滿足這種粗暴的粒度(事實上多數時候讀總是多於寫),這時,讀寫鎖出現了。
ReaderWriterLockSlim
在 Framework 3.5 加入的,被加入了standard 1.0,此型別是執行緒安全的,用於保護由多個執行緒讀取的資源。
ReaderWriterLockSlim
出現的目的是為了取締ReaderWriterLock
,他簡化了遞迴規則以及鎖狀態的升級和降級規則。避免了許多潛在的死鎖情況。 另外,他的效能顯著優於ReaderWriterLock
。 建議對所有新開發的專案使用ReaderWriterLockSlim
然而如果與普通的
lock
(Monitor.Enter / Exit
)對比,他還是要慢一倍。
ReaderWriterLockSlim
有三種模式:
讀取模式:允許任意多的執行緒處於讀取模式
可升級模式:只允許一個執行緒處於可升級模式,與讀鎖相容
寫入模式:完全互斥,不允許任何模式下的執行緒獲取任何鎖
ReaderWriterLockSlim
定義瞭如下的方法來獲取和釋放讀 / 寫鎖:
public void EnterReadLock();
public void ExitReadLock();
public void EnterWriteLock();
public void ExitWriteLock();
另外,對應所有EnterXXX
的方法,都有相應的TryXXX
版本,可以接受一個超時引數,與Monitor.TryEnter
類似。
讓我們來看一個案例:
模擬三個讀執行緒,兩個寫執行緒,並行執行
new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Write).Start();
new Thread(Write).Start();
讀方法是這樣的
while (true)
{
_rw.EnterReadLock();
foreach (int number in _items)
{
Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
Thread.Sleep(100);
}
_rw.ExitReadLock();
}
寫方法是這樣的
while (true)
{
int number = _rand.Value.Next(100);
_rw.EnterWriteLock();
_items.Add(number);
_rw.ExitWriteLock();
Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
Thread.Sleep(100);
}
亂數生成方法就是用的TLS講過的
new ThreadLocal<Random>(() => new Random(Guid.NewGuid().GetHashCode()));
需要注意ReaderWriterLockSlim
實現了IDisposable
,用完了請記得釋放
public class ReaderWriterLockSlim : IDisposable
執行結果:
Thread 11 added 42
Thread 8 reading 42
Thread 6 reading 42
Thread 7 reading 42
Thread 10 added 98
Thread 8 reading 42
...
顯而易見的,並行度變高了
ReaderWriterLockSlim
提供一個構造引數LockRecursionPolicy
用於設定鎖遞迴策略
public ReaderWriterLockSlim(LockRecursionPolicy recursionPolicy)
public enum LockRecursionPolicy
{
/// <summary>If a thread tries to enter a lock recursively, an exception is thrown. Some classes may allow certain recursions when this setting is in effect.</summary>
NoRecursion,
/// <summary>A thread can enter a lock recursively. Some classes may restrict this capability.</summary>
SupportsRecursion,
}
預設情況下是使用NoRecursion
策略:不允許遞迴或重入,這與GO的讀寫鎖設計不謀而合,建議使用此預設策略,因為遞迴引入了不必要的複雜性,並使程式碼更易於死鎖。
public ReaderWriterLockSlim() : this(LockRecursionPolicy.NoRecursion)
開啟支援遞迴策略後,以下程式碼不會丟擲LockRecursionException
異常
var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
rw.EnterReadLock();
rw.EnterReadLock();
rw.ExitReadLock();
rw.ExitReadLock();
遞迴鎖定級別只能越來越小,級別順序如下:讀鎖,可升級鎖,寫鎖
。下面程式碼會丟擲LockRecursionException
異常
void F()
{
var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
rw.EnterReadLock();
rw.EnterWriteLock();
rw.EnterWriteLock();
rw.ExitReadLock();
}
Assert.Throws<LockRecursionException>(F);
可升級鎖例外,把可升級鎖升級為寫鎖是合法的。
var rw = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
rw.EnterUpgradeableReadLock();
rw.EnterWriteLock();
rw.ExitWriteLock();
rw.ExitUpgradeableReadLock();
思考一個問題:為什麼只允許一個執行緒處於可升級模式?
SQL Server | ReaderWriterLockSlim |
---|---|
共用鎖(Share lock) | 讀鎖(Read lock) |
排它鎖(Exclusive lock) | 寫鎖(Write lock) |
更新鎖(Update lock) | 可升級鎖(Upgradeable lock) |
如果你需要使用規律的時間間隔重複執行一些方法,這個例子會使得一個執行緒永遠被佔用
while (true)
{
// do something
Thread.Sleep(1000);
}
這時候你會需要Timer
建立計時器時,可以指定在方法首次執行之前等待的時間 dueTime
,以及後續執行之間等待的時間period
。 類 Timer 的解析度與系統時鐘相同。 這意味著,如果period
小於系統時鐘的解析度,委託將以系統時鐘解析度定義的時間間隔執行,在Windows 7 和Windows 8系統上大約為 15 毫秒。
public Timer(TimerCallback callback, object? state, int dueTime, int period)
下面這個例子首次間隔1s,之後間隔500ms列印tick...
Timer timer = new Timer ((data) =>
{
_testOutputHelper.WriteLine(data.ToString());
}, "tick...", 1000, 500);
Thread.Sleep(3000);
timer.Dispose();
計時器委託是在構造計時器時指定的,不能更改。 該方法不會在建立計時器的執行緒上執行;而是在執行緒池(thread pool)執行。
如果計時器間隔
period
小於執行回撥所需的時間,或者如果所有執行緒池執行緒都在使用,並且回撥被多次排隊,則可以在兩個執行緒池執行緒上同時執行回撥。只要使用 Timer,就必須保留對它的參照。 與任何託管物件一樣,當沒有對其參照時,會受到垃圾回收的約束。 即使 Timer 仍然處於活動狀態也不會阻止它被收集。
不再需要計時器時,請呼叫 Dispose 釋放計時器持有的資源。請注意,呼叫 Dispose() 後仍然可能會發生回撥,因為計時器將回撥排隊供執行緒池執行緒執行。可以使用
public bool Dispose(WaitHandle notifyObject)
過載等待所有回撥完成。
System.Threading.Timer
是一個普通計時器。 它會回撥一個執行緒池執行緒(來自工作池)。
System.Timers.Timer
是一個System.ComponentModel.Component
,它包裝System.Threading.Timer
,並提供一些用於在特定執行緒上排程的附加功能。