當執行緒 A 在等待一個同步構造,另一個執行緒 B 持有構造一直不釋放,那麼就會導致執行緒 A 阻塞。同步構造有使用者模式構造和核心模式構造。
活鎖和死鎖:
當執行緒獲取不到資源,從而不停在 CPU 上自旋等待資源,就會形成活鎖。這是通過使用者構造實現的。
當執行緒獲取不到資源,被作業系統阻塞,就會形成死鎖。這是通過核心構造實現的。
.Net 提供了兩種使用者構造,易變構造 Volatile、互鎖構造 Interlocked,這兩種構造都提供了原子性讀寫的功能。
.Net 提供了基於易變構造、互鎖構造、SpinWait 實現的自旋鎖 SpinLock。
原子性讀寫:
在 32 位 CPU 中,CPU 一次只能儲存 32 位的資料,所以如果是 64 位的資料型別(如 double),就得執行兩次 MOV 指令,所以在 32 位 CPU 和 32 位元運算系統中,不同執行緒對 64 位的資料型別進行讀寫可能得到不同的結果。原子性讀寫就是保證了即使是 64 位的資料型別,不同執行緒讀寫也會得到相同的結果。現在的 CPU 和作業系統基本都是 64 位的,所以一般也不會遇到這種問題。
Volatile 一般用於阻止編譯器程式碼優化,編譯器優化程式碼會優化掉一些在單執行緒情況下無用的變數或者語句,在多執行緒程式碼下有時候會導致程式執行結果跟設計的不一樣。
Volatile.Read() 強制對變數的取值必須在呼叫時讀取,Volatile.Write() 強制對變數的賦值必須在呼叫時寫入。
/// <summary>
/// 在 debug 模式下不開啟程式碼優化,所以需要用 release 模式下生成。
/// 執行 dotnet build -c release --no-incremental 後執行程式碼,如果沒有標記為易變,則不會列印 x。
/// </summary>
public void Test2()
{
var switchTrue = false;
var t = new Thread(() =>
{
var x = 0;
while (!switchTrue) // 如果沒有標記變數為易變,編譯器會把 while(!switchTrue) 優化為 while(true) 從而導致永遠不會列印出 x 的值
//while (!Volatile.Read(ref switchTrue)) // 標記為易變,可以保證在呼叫時才進行取值,不會進行程式碼優化。
{
x++;
}
Console.WriteLine($"x: {x}");
});
t.IsBackground = true;
t.Start();
Thread.Sleep(100);
switchTrue = true;
Console.WriteLine("ok");
}
/// <summary>
/// 用 Interlocked 實現一個簡單的自旋鎖
/// 注意:
/// 1. 自旋鎖在獲取不到鎖的時候,會進行空轉。所以在自旋的時候,會佔用 CPU,所以一般不在單 CPU 機器上用。
/// 2. 當佔有鎖的執行緒優先順序比獲取鎖的執行緒更低的時候,會導致佔有鎖的執行緒一直獲取不到CPU進行工作,從而無法釋放鎖,導致活鎖。
/// 所以使用自旋鎖的執行緒,應該禁用執行緒優先順序提升功能。
/// </summary>
public class SimpleSpinLock
{
private int _count;
public void Enter()
{
while (true)
{
if (Interlocked.Exchange(ref _count, 1) == 0)
{
return;
}
}
}
public void Exit()
{
Volatile.Write(ref _count, 0);
}
}
/// <summary>
/// 使用 Interlocked 實現的單例,輕量且簡單。
/// 可能會同時呼叫多次建構函式,所以適合建構函式沒有副作用的類
/// </summary>
internal class DoubleCheckLocking3
{
private static DoubleCheckLocking3? _value;
private DoubleCheckLocking3()
{
}
private DoubleCheckLocking3 GetInstance()
{
if (_value != null) return _value;
Interlocked.CompareExchange(ref _value, new DoubleCheckLocking3(), null);
return _value;
}
}
/// <summary>
/// 使用 lock 和雙檢索實現的單例化
/// </summary>
internal class DoubleCheckLocking
{
private static DoubleCheckLocking? _value;
private static readonly object _lock = new();
private DoubleCheckLocking()
{
}
public static DoubleCheckLocking GetInstance()
{
if (_value != null) return _value;
lock (_lock)
{
if (_value == null)
{
var t = new DoubleCheckLocking();
Volatile.Write(ref _value, t);
}
}
return _value;
}
}
.Net 提供了一個輕量化的同步構造 SpinLock,很適合在不常發生競爭的場景使用。如果發生競爭了,會先在 CPU 上自旋一段時間,如果還不能獲取到資源,就會讓出 CPU 控制權給其他執行緒(使用 SpinWait 實現的)。
重入鎖(Re-Enter): 就是一個執行緒呼叫了 SpinLock.Enter() 後,沒有呼叫 SpinLock.Exit(),再次呼叫了 SpinLock.Enter()。
/// <summary>
/// 測試 SpinLock 重入鎖
/// </summary>
public void Test3()
{
var spinLock = new SpinLock(true); // 如果傳 true,如果 SpinLock 重入鎖,就會丟擲異常,傳 false 則不會,只會死鎖。
ThreadPool.QueueUserWorkItem(_ => DoWork());
void DoWork()
{
var lockTaken = false;
for (int i = 0; i < 10; i++)
{
try
{
Thread.Sleep(100);
if (!spinLock.IsHeldByCurrentThread) // SpinLock.IsHeldByCurrentThread 可以判斷是不是當前執行緒擁有鎖,如果是就不再獲取鎖
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 獲取鎖,i 為 {i}");
spinLock.Enter(ref lockTaken);
}
//spinLock.Enter(ref lockTaken); // 重入鎖會死鎖
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
if (lockTaken) // 使用 lockTaken 來判斷鎖是否已經被持有
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 釋放鎖");
spinLock.Exit();
}
Console.WriteLine("結束");
}
}
/// <summary>
/// 測試裝箱拆箱問題
/// </summary>
public void Test4()
{
var spinLock = new SpinLock(false);
Task.Run(() => DoWork(ref spinLock));
Task.Run(() => DoWork(ref spinLock));
// SpinLock 是 Struct 型別,要注意裝箱拆箱的問題,試試看不加 ref 關鍵字的效果
void DoWork(ref SpinLock spinLock)
{
var lockTaken = false;
Thread.Sleep(500);
spinLock.Enter(ref lockTaken);
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 獲取鎖");
}
}
.Net 提供了 System.Threading.WaitHandle 和 WaitHandle 的子類來支援核心構造,WaitHandle 封裝核心同步構造的控制程式碼,並且提供了操作的方法,並且每個方法都會在呼叫處建立記憶體屏障。
WaitHandle 有以下實現類,這些類定義了一個訊號機制,根據訊號去釋放執行緒或者阻塞執行緒,用於在多執行緒的場景下存取共用資源:
WaitHandle:抽象基礎類別,封裝了系統核心構造的控制程式碼。繼承自 MarshalByRefObject,所以可以跨程序和 domain 邊界。
WaitHandle 有以下常用方法:
public class WaitHandleDemo
{
/// <summary>
/// 測試 WaitHandle.WaitAll(), 成功執行返回 true, 支援超時,當超時時,返回 false
/// WaitHandle.WaitAny(), 成功執行返回對應的 索引,支援超時,當超時時,返回 WaitHandle.WaitTimeout
/// </summary>
public void Test()
{
var waitHandleList = new WaitHandle[] { new AutoResetEvent(false), new AutoResetEvent(false) };
ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
var timeout = WaitHandle.WaitAll(waitHandleList);
Console.WriteLine($"是否超時:{!timeout},WaitHandle.WaitAll() 結束");
Thread.Sleep(500);
ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
timeout = WaitHandle.WaitAll(waitHandleList,1000);
Console.WriteLine($"是否超時:{!timeout},WaitHandle.WaitAll() 結束");
ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
var index = WaitHandle.WaitAny(waitHandleList);
Console.WriteLine($"{index} 已經結束執行,WaitHandle.WaitAny() 結束");
ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[0]);
ThreadPool.QueueUserWorkItem(DoWork, waitHandleList[1]);
index = WaitHandle.WaitAny(waitHandleList, 1000);
Console.WriteLine($"是否超時:{WaitHandle.WaitTimeout == index},WaitHandle.WaitAny() 結束");
void DoWork(object? state)
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 開始");
var r = new Random();
var interval = 1000 * r.Next(2, 10);
Thread.Sleep(interval);
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 結束");
((AutoResetEvent)state).Set();
}
}
/// <summary>
/// 測試 WaitHandle.SignalAndWait(), 成功執行返回 true, 支援超時,當超時時,返回 false
/// </summary>
public void Test2()
{
var are = new AutoResetEvent(false);
var are2 = new AutoResetEvent(false);
foreach (var i in Enumerable.Range(1,5))
{
Console.WriteLine($"按下 Enter 啟動執行緒 {i}");
Console.ReadLine();
var t = new Thread(DoWork)
{
Name = $"執行緒 {i}"
};
t.Start();
WaitHandle.SignalAndWait(are, are2); // 給 are 發訊號,同時等待 are2
}
Console.WriteLine("全部執行緒執行結束");
void DoWork()
{
are.WaitOne();
Console.WriteLine($"{Thread.CurrentThread.Name} 開始");
Thread.Sleep(1000);
Console.WriteLine($"{Thread.CurrentThread.Name} 結束");
are2.Set();
}
}
}
EventWaitHandle、ManualResetEvent、AutoResetEvent 是核心同步構造,EventWaitHandle 由核心維護了一個 bool 變數,為 false 的時候阻塞執行緒,為 true 的時候釋放執行緒。ManualResetEvent、AutoResetEvent 繼承自 EventWaitHandle,所以擁有一樣的行為,同時可以跨程序跨 domain 通訊。
ManualResetEventSlim 並不繼承自 EventWaitHandle,只是功能跟 ManualResetEvent、AutoResetEvent 一樣的混合同步構造,使用使用者構造和核心構造混合實現,遇到競爭的情況,會先自旋一下,還無法獲取到資源,再使用核心構造阻塞執行緒,所以有更好的效能。
/// <summary>
/// 測試 EventWaitHandle 跟其他執行緒通訊
/// </summary>
public void Test2()
{
EventWaitHandle ewh;
if (EventWaitHandle.TryOpenExisting("multi-process", out ewh))
{
Console.WriteLine("等待 EventWaitHandle");
ewh.WaitOne();
Console.WriteLine("結束執行");
}
else
{
ewh = new EventWaitHandle(false, EventResetMode.AutoReset, "multi-process");
while (true)
{
Console.WriteLine("按下 Enter 跟其他執行緒通訊");
Console.ReadLine();
ewh.Set();
}
}
}
/// <summary>
/// 測試 ManualResetEvent.Set() 和 ManualResetEvent.Reset()
/// </summary>
public void Test1()
{
var mre = new ManualResetEvent(false);
foreach (var i in Enumerable.Range(1, 3))
{
StartThread(i);
}
Thread.Sleep(500);
Console.WriteLine("按下 Enter 呼叫 Set(),釋放所有執行緒");
Console.ReadLine();
mre.Set();
Thread.Sleep(500);
Console.WriteLine("ManualResetEvent 內部值為 true 時,不會阻塞執行緒。按下 Enter 啟動一個新執行緒進行測試");
Console.ReadLine();
StartThread(4);
Thread.Sleep(500);
Console.WriteLine("按下 Enter 呼叫 Reset(),可以再次阻塞執行緒");
Console.ReadLine();
mre.Reset();
Thread.Sleep(500);
foreach (var i in Enumerable.Range(5, 2))
{
StartThread(i);
}
Thread.Sleep(500);
Console.WriteLine("按下 Enter 呼叫 Set(),釋放所有執行緒,結束 demo");
Console.ReadLine();
mre.Set();
Thread.Sleep(500);
void StartThread(int i)
{
var t = new Thread(() =>
{
Console.WriteLine($"{Thread.CurrentThread.Name} 啟動並呼叫 WaitOne()");
mre.WaitOne();
Console.WriteLine($"{Thread.CurrentThread.Name} 結束執行");
})
{
Name = $"執行緒_{i}"
};
t.Start();
}
}
public void Test()
{
var are = new AutoResetEvent(false);
Task.Run(() =>
{
for (int i = 0; i < 5; i++)
{
Thread.Sleep(500);
Console.WriteLine("按下 Enter 釋放一個執行緒");
Console.ReadLine();
are.Set();
}
});
foreach (var i in Enumerable.Range(1,5))
{
var t = new Thread(DoWork);
t.Name = $"執行緒 {i}";
t.Start();
}
void DoWork()
{
Console.WriteLine($"{Thread.CurrentThread.Name} 開始");
are.WaitOne();
Console.WriteLine($"{Thread.CurrentThread.Name} 結束");
}
}
Semaphore 是一個核心構造,由核心維護了一個 Int32 變數,為當值為 0 時,阻塞執行緒,呼叫 Semaphore.Release() 會把變數加 1,呼叫 WaitHandle.WaitOne() 會把變數減 1。
SemaphoreSlim 是一個混合構造,功能跟 Semaphore 一致,使用使用者構造和核心構造混合實現,遇到競爭的情況,會先自旋一下,還無法獲取到資源,再使用核心構造阻塞執行緒,所以有更好的效能。
/// <summary>
/// 測試 Semaphore
/// </summary>
public void Test4()
{
var pool = new Semaphore(1, 3); // 初始化計數 1,最大計數 3
foreach (var i in Enumerable.Range(1, 5))
{
var t = new Thread(DoWork);
t.Name = $"執行緒 {i}";
t.Start();
}
Thread.Sleep(500);
Console.WriteLine("按下 Enter 釋放 3 個執行緒");
Console.ReadLine();
pool.Release(3); // 計數加3
Thread.Sleep(500);
Console.WriteLine("再按下 Enter 釋放 1 個執行緒");
Console.ReadLine();
pool.Release(); // 計數加1
void DoWork()
{
Console.WriteLine($"{Thread.CurrentThread.Name} 開始");
pool.WaitOne(); // 計數減1
Console.WriteLine($"{Thread.CurrentThread.Name} 結束");
}
}
/// <summary>
/// 測試跟其他程序通訊
/// </summary>
public void Test5()
{
Semaphore pool;
if (Semaphore.TryOpenExisting("multi-process", out pool))
{
Console.WriteLine("等待 Semaphore");
pool.WaitOne();
Console.WriteLine("結束");
}
else
{
pool = new Semaphore(0, 1, "multi-process"); // 最大計數設定為 1,每次只解除一個阻塞。
while (true)
{
Console.WriteLine("按下 Enter 跟其他執行緒通訊");
Console.ReadLine();
pool.Release();
}
}
}
Mutex 是一個核心構造,經常用於程序同步(如保證只有程式只能有一個程序)。功能跟 AutoResetEvent(false) 和 Semaphore(0,1) 類似,每次只能阻塞一個執行緒或者程序。
Mutex 跟 EventWaitHandle 和 Semaphore 不一樣的地方是,Mutex 要求執行緒一致(也就是獲取和釋放都必須在同一個執行緒),並且支援重入鎖。
/// <summary>
/// Mutex 支援重入鎖,支援執行緒一致
/// </summary>
public void Test()
{
var mutex = new Mutex(false);
var count = 0;
DoWork(mutex);
void DoWork(Mutex mutex)
{
try
{
mutex.WaitOne();
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 獲取 Mutex");
Interlocked.Increment(ref count);
Thread.Sleep(1000);
if (Interlocked.CompareExchange(ref count, 3, 3) == 3)
{
return;
}
DoWork(mutex);
}
finally
{
mutex.ReleaseMutex(); // 呼叫幾次 WaitOne() 就必須呼叫幾次 ReleaseMutex(),並且呼叫 WaitOne() 和 ReleaseMutex() 必須在同一個執行緒。
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 釋放 Mutex");
}
}
}
從上面可以看出,.Net 核心構造功能比使用者構造強大得多,所以看起來似乎直接使用核心構造,而不使用使用者模式構造更加明智。
但是使用者構造會比核心構造快,所以在不常發生競爭或者效能敏感的場景下,使用使用者構造會是一個更加優秀的做法。接下來用一個沒有競爭的空方法測試一下快多少。
internal class PerformanceDemo
{
/// <summary>
/// 測試使用者模式構造和核心模式構造,在鎖沒有發生競爭的情況下的效能差距
/// </summary>
public void Test()
{
var count = 1000 * 10000;
var spinLock = new SpinLock(false);
var are = new AutoResetEvent(true);
var pool = new Semaphore(1, 1);
var sw = Stopwatch.StartNew();
foreach (var _ in Enumerable.Range(0, count))
{
var lockTaken = false;
spinLock.Enter(ref lockTaken);
DoWork();
spinLock.Exit(lockTaken);
}
Console.WriteLine($"在沒有競爭的場景下,執行一個空方法一千萬次,SpinLock 耗時:{sw.ElapsedMilliseconds} ms");
sw.Restart();
foreach (var _ in Enumerable.Range(0, count))
{
are.WaitOne();
DoWork();
are.Set();
}
Console.WriteLine($"在沒有競爭的場景下,執行一個空方法一千萬次,AutoResetEvent 耗時:{sw.ElapsedMilliseconds} ms");
sw.Restart();
foreach (var _ in Enumerable.Range(0, count))
{
pool.WaitOne();
DoWork();
pool.Release();
}
Console.WriteLine($"在沒有競爭的場景下,執行一個空方法一千萬次,Semaphore 耗時:{sw.ElapsedMilliseconds} ms");
// 空方法
void DoWork()
{
}
}
}
// 輸出:
// 在沒有競爭的場景下,執行一個空方法一千萬次,SpinLock 耗時:184 ms
// 在沒有競爭的場景下,執行一個空方法一千萬次,AutoResetEvent 耗時:5449 ms
// 在沒有競爭的場景下,執行一個空方法一千萬次,Semaphore 耗時:5366 ms
最終在我的機子上測試,在沒有發生競爭的場景下,.NET 提供的使用者構造效能是核心構造的 30 倍,所以效能差距還是非常大的。
使用者構造在遇到競爭,在長時間獲取不到資源的場景,會一直在 CPU 上自旋,既浪費 CPU 時間,又耽誤其他執行緒執行,核心構造在作業系統的協調下,會把獲取不到資源的執行緒阻塞,不會浪費 CPU 時間。
核心構造在沒有競爭的場景下,效能會比使用者構造差幾十倍。
混合構造就是組合使用者構造和核心構造的實現,遇到競爭的時候,先使用使用者構造自旋一下,自旋一段時間還沒獲取到資源,就使用核心構造阻塞執行緒,這樣就能結合兩種構造的優點了。
.Net 提供了 ManualResetEventSlim、SemaphoreSlim、Monitor、lock 關鍵字、ReaderWriterLockSlim、CountDownEvent、Barrier 等混合構造,可以在不同的場景下使用。
通過這個例子可以瞭解一下是怎麼組合核心構造和使用者構造的。
/// <summary>
/// 一個簡單的混合構造,組合 AutoResetEvent 和 Interlocked 實現
/// </summary>
internal class SimpleHybridLock : IDisposable
{
private int _waiter;
private AutoResetEvent _waiterLock = new(false);
public void Enter()
{
if (Interlocked.Increment(ref _waiter) == 1)
{
return;
}
_waiterLock.WaitOne();
}
public void Exit()
{
if (Interlocked.Decrement(ref _waiter) == 0)
{
return;
}
_waiterLock.Set();
}
public void Dispose()
{
_waiterLock.Dispose();
}
}
lock 關鍵字是最常使用的同步構造了,lock 可以鎖定一個程式碼塊,保證每次只有一個執行緒存取執行該程式碼塊,lock 是基於 Montor 實現的,通過 try{...}finally{...} 把程式碼塊包圍起來。
/// <summary>
/// 測試 Monitor.Wait(object)、Monitor.Pulse(object)、Monitor.PulseAll(object)
/// 注意點:
/// 呼叫 Wait()、Pulse()、PulseAll() 也必須先呼叫 Enter() 獲取鎖,退出的時候也必須呼叫 Exit() 釋放鎖
/// </summary>
public void Test()
{
var lockObj = new object();
Task.Factory.StartNew(() =>
{
Thread.Sleep(500);
Console.WriteLine("按下 c 呼叫 Monitor.Pulse(object)");
if (Console.ReadKey().Key == ConsoleKey.C)
{
try
{
Monitor.Enter(lockObj);
Monitor.Pulse(lockObj);
}
finally
{
Monitor.Exit(lockObj);
}
}
Thread.Sleep(500);
if (Console.ReadKey().Key == ConsoleKey.C)
{
try
{
Monitor.Enter(lockObj);
Monitor.PulseAll(lockObj);
}
finally
{
Monitor.Exit(lockObj);
}
}
});
Parallel.Invoke(DoWork, DoWork, DoWork);
void DoWork()
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 啟動");
try
{
Monitor.Enter(lockObj);
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 獲得 Monitor");
Thread.Sleep(100);
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 呼叫 Monitor.Wait()");
Monitor.Wait(lockObj);
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 重新獲得 Monitor");
}
finally
{
Monitor.Exit(lockObj);
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 釋放 Monitor");
}
}
}
/// <summary>
/// 測試 Monitor.Enter(字串)
/// 因為字串會被留用,所以會導致不同執行緒間互斥存取。
/// </summary>
public void Test2()
{
var mre = new ManualResetEventSlim(false);
Task.Run(() =>
{
Console.WriteLine("按下 c 啟動");
if (Console.ReadKey().Key == ConsoleKey.C)
{
mre.Set();
}
});
Parallel.Invoke(DoWork, DoWork, DoWork);
void DoWork()
{
mre.Wait();
try
{
Monitor.Enter("1");
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 進入同步程式碼塊");
Thread.Sleep(1000);
}
finally
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 退出同步程式碼塊");
Monitor.Exit("1");
}
}
}
/// <summary>
/// 測試 Monitor.Enter(值型別)
/// 因為 Monitor.Enter(object) 引數是 object,所以值型別必須裝箱,那樣其實就會有問題了。
/// 值型別在堆疊上,沒有參照,參照型別在堆上,有參照,所以裝箱就是在堆上新建一個範例,然後複製棧上值的內容,拆箱就是把堆上範例的值,複製到棧上。
/// </summary>
public void Test3()
{
var mre = new ManualResetEventSlim(false);
var i = 1;
//Object o = i;
Task.Run(() =>
{
Console.WriteLine("按下 c 啟動");
if (Console.ReadKey().Key == ConsoleKey.C)
{
mre.Set();
}
});
Parallel.Invoke(DoWork, DoWork, DoWork);
void DoWork()
{
mre.Wait();
object o = i;
try
{
Monitor.Enter(o);
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 進入同步程式碼塊");
Thread.Sleep(1000);
}
finally
{
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 退出同步程式碼塊");
Monitor.Exit(o);
}
}
}
CountdownEvent 是一個混合構造,經常用於 fork/join 等場景,就是等待多個並行任務完成,再執行下一個任務。CountdownEvent 內部會維護一個計數,當計數為 0 時,解除執行緒的阻塞。
public void Test2()
{
var queue = new ConcurrentQueue<int>(Enumerable.Range(1, 100));
var cde = new CountdownEvent(queue.Count);
var doWork = new Action(() =>
{
while (queue.TryDequeue(out var result))
{
Thread.Sleep(100);
Console.WriteLine(result);
cde.Signal();
}
});
var _ = Task.Run(doWork); // fork
var _2 = Task.Run(doWork); // fork
var complete = new Action(() =>
{
cde.Wait(); // join
Console.WriteLine($"queue Count {queue.Count}");
});
var t = Task.Run(complete);
var t2 = Task.Run(complete);
Task.WaitAll(t, t2);
Console.WriteLine($"CountdownEvent 重新初始化");
cde.Reset(2); // 呼叫 Reset() 將 cde 重新初始化
cde.AddCount(10); // 呼叫 AddCount() cde 內部計數 + 1
var cts = new CancellationTokenSource(1000); // 測試超時機制
try
{
cde.Wait(cts.Token);
}
catch (Exception e)
{
Console.WriteLine(e);
}
cde.Dispose();
}
Barrier 是一個混合構造,可以通過 participantCount 來指定一個數值,同時會維護一個內部數值 total,每次呼叫 SignalAndWait() 的時候,阻塞呼叫執行緒,同時把total 加 1,等到 total == participantCount,呼叫 postPhaseAction,通過 postPhaseAction 來確定彙總每個執行緒的資料,並且執行下個階段的工作。
Barrier 適合一種特殊場景,把一個大任務拆分成多個小任務,然後每個小任務又會分階段執行。像是 Parallel 的 Plus 版,如果任務步驟很多,用 Parallel 來分拆很麻煩,可以考慮用 Barrier。
public class BarrierDemo
{
public void Test()
{
var words = new string[] { "山", "飛", "千", "鳥", "絕" };
var words2 = new string[] { "人", "滅", "徑", "萬", "蹤" };
var solution = "千山鳥飛絕,萬徑人蹤滅";
bool success = false;
var barrier = new Barrier(2, b =>
{
var sb = new StringBuilder();
sb.Append(string.Concat(words));
sb.Append(',');
sb.Append(string.Concat(words2));
Console.WriteLine(sb.ToString());
//Thread.Sleep(1000);
if (string.CompareOrdinal(solution, sb.ToString()) == 0)
{
success = true;
Console.WriteLine($"已完成");
}
Console.WriteLine($"當前階段數:{b.CurrentPhaseNumber}");
});
var t = Task.Run(() => DoWork(words));
var t2 = Task.Run(() => DoWork(words2));
Console.ReadLine();
void DoWork(string[] words)
{
while (!success)
{
var r = new Random();
for (int i = 0; i < words.Length; i++)
{
var swapIndex = r.Next(i, words.Length);
(words[swapIndex], words[i]) = (words[i], words[swapIndex]);
}
barrier.SignalAndWait();
}
}
}
}
ReaderWriterLockSlim 是一個混合構造。一般場景中在讀取資料的時候,不會涉及到資料的修改,所以可以並行讀取,在修改資料的時候,才會涉及到資料的修改,所以應該互斥修改。其他同步構造無論讀取還是修改資料都是鎖定的,所以 .Net 提供了一個讀寫鎖 ReaderWriterLockSlim。
ReaderWriterLockSlim 的邏輯如下:
/// <summary>
/// ReaderWriterLockerSlim 用法
/// </summary>
internal class Transaction2
{
private DateTime _timeLastTrans;
public DateTime TimeLastTrans
{
get
{
_lock.EnterReadLock();
Thread.Sleep(1000);
var t = _timeLastTrans;
Console.WriteLine($"呼叫 ReadLock {Thread.CurrentThread.ManagedThreadId}");
_lock.ExitReadLock();
return t;
}
}
private readonly ReaderWriterLockSlim _lock = new(LockRecursionPolicy.NoRecursion);
public void PerformTransaction()
{
_lock.EnterWriteLock();
_timeLastTrans = DateTime.Now;
Console.WriteLine($"呼叫 WriteLock {Thread.CurrentThread.ManagedThreadId}");
_lock.ExitWriteLock();
}
public void Test()
{
PerformTransaction();
ThreadPool.QueueUserWorkItem(_ => Console.WriteLine(TimeLastTrans));
PerformTransaction();
Thread.Sleep(500); // 就算睡眠500ms,在鎖釋放後,依舊先進行讀操作,讀完才有寫操作。
ThreadPool.QueueUserWorkItem(_ => Console.WriteLine(TimeLastTrans));
}
}
回顧了一下知識,總結了一下,發現自己又學到不少。下次回顧一下 Task 的知識。
原始碼 https://github.com/yijidao/blog/tree/master/TPL/ThreadDemo/ThreadDemo3