C# 執行緒查漏補缺

2023-01-31 15:01:59

程序和執行緒

不同程式執行需要進行排程和獨立的記憶體空間

在單核計算機中,CPU 是獨佔的,記憶體是共用的,這時候執行一個程式的時候是沒有問題。但是執行多個程式的時候,為了不發生一個程式霸佔整個 CPU 不釋放的情況(如一個程式死迴圈無法結束了,那麼其他程式就沒有機會執行了),就需要開發者給不同程式劃分不同的執行時間。為了避免不同程式之間互相運算元據或程式碼,導致程式被破壞的情況,就需要開發者給程式劃分獨立的記憶體範圍。也就是程式需要開發者進行排程以及和劃分獨立的記憶體空間。

程序是應用程式的一個範例

為了避免每個開發者來進行這個工作,所以有了作業系統。作業系統負責整個計算機的程式排程,讓每個程式都有機會使用CPU,同時使用來程序來為程式維護一個獨立虛擬空間,確保程式間的執行不會互相干擾。所以程序就是程式的一個範例,擁有程式需要使用的資源集合,確保自己的資源不會被其他程序破壞。

執行緒是作業系統進行排程的最小單位

這時候一個程序一次只能處理一個任務,如果需要一邊不停輸出 hellowork,一邊計時,那麼需要啟動兩個程序。如果需要對一個佇列同時入隊出隊,那麼不僅需要兩個程序,還需要兩個程序可以存取相同的記憶體空間。所以為了程序可以並行地處理任務,同時共用相同的資源,就需要給程序一個更小的排程單位,也就是執行緒,因此,執行緒也叫輕量化程序。所以在現代計算機中,操作繼續不會直接排程程序在 CPU 上執行,而是排程執行緒在 CPU 上執行,所以說,執行緒是作業系統進行排程的最小單位。

執行緒操作

新建執行緒、啟動執行緒、執行緒優先順序

public void Test()
{
    var t = new Thread(() => { }); // 使用無參委託
    var t2 = new Thread(state => { }); // 使用 object? 引數委託
    var t3 = new Thread(DoWork);
    var t4 = new Thread(DoWork2);
    t.Priority = ThreadPriority.Highest; // 設定執行緒的優先順序,預設是 ThreadPriority.Normal
    t.Start(); // 不傳入引數,啟動執行緒
    t2.Start("引數"); // 傳入引數,啟動執行緒

    void DoWork() {}
    void DoWork2(object? state) {}
}

阻塞執行緒的執行

  1. 當執行緒呼叫 Sleep() 或者等待鎖時,進入阻塞狀態。
public void Test()
{
    var pool = new SemaphoreSlim(0, 1);
    var t = new Thread(DoWork);
    var t2 = new Thread(DoWork2);
    t.Start();
    t2.Start();

    void DoWork()
    {
        pool.Wait(); // 等待號誌
    }
    void DoWork2()
    {
        Thread.Sleep(Timeout.Infinite); // 永久休眠
    }
}
  1. Thread.Sleep() 不僅用於休眠,也可以用於讓出當前 CPU 時間,讓其他正在等待 CPU 的執行緒也有機會搶到 CPU 時間。
    tip:相似的方法,Thread.Yield() 也有讓出 CPU 時間的功能。
    tip:不同的方法,Thread.SpinWait() 不會讓出 CPU 控制權,而是進行自旋。
Thread.Sleep(0) 讓出控制權給同等優先順序的執行緒執行,如果沒有,就繼續執行本執行緒。
Thread.Sleep(1) 讓出控制權給正在等待的執行緒執行。
Thread.Yield() 讓出控制權給CPU上的其他執行緒。
Thread.SpinWait() 不讓出控制權,在CPU上自旋一段時間。

中斷阻塞中的執行緒

當執行緒處於阻塞狀態時,其他執行緒呼叫阻塞執行緒的 Thread.Interrupt() 時,會中斷執行緒並丟擲 System.Threading.ThreadInterruptedException。
tip:如果執行緒沒有處於阻塞狀態,那麼呼叫 Thread.Interrupt() 則不會有效果。

public void Test3()
{
    var sleepSwitch = false;
    var pool = new SemaphoreSlim(0, 1);

    var t = new Thread(DoWork);
    t.Start();
    Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 呼叫 {t.ManagedThreadId} 的 Interrupt()");
    t.Interrupt();
    Thread.Sleep(3000);
    sleepSwitch = true;

    var t2 = new Thread(DoWork2);
    t2.Start();
    Thread.Sleep(2000);
    Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 呼叫 {t2.ManagedThreadId} 的 Interrupt()");
    t2.Interrupt();

    void DoWork()
    {
        try
        {
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: 開始執行");
            while (!sleepSwitch)
            {
                Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: 自旋 SpinWait()");
                Thread.SpinWait(10000000); // 只是進行自旋,不阻塞執行緒,所以不會被中斷
            }
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: 休眠 Sleep()");
            Thread.Sleep(Timeout.Infinite);
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
        }
    }

    void DoWork2()
    {
        try
        {
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}: 開始執行");
            pool.Wait();
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
        }
    }
}

取消執行緒的執行

取消正在執行中或者阻塞中的執行緒有多種方法

  • 呼叫 Thread.Interrupt() 中斷執行緒
  • 呼叫 CancellationTokenSource.Cancel() 或者超時取消
  • 通過 WaitHandle 超時取消
  1. 取消正在執行的執行緒
/// <summary>
/// 使用 CancellationToken 取消處於死迴圈的執行緒,或者超時取消
/// </summary>
public void Test2()
{
    var cts = new CancellationTokenSource(5000);

    Task.Run(() =>
    {
        Console.WriteLine("按下 c 取消執行緒,或者五秒後取消");

        if (Console.ReadKey().Key == ConsoleKey.C)
        {
            cts.Cancel();
        }
    });
    var t = new Thread(DoWork);
    t.Start(cts.Token);

    void DoWork(object? state)
    {
        var ct = (CancellationToken)state;
        while (!ct.IsCancellationRequested)
        {
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 自旋");
            Thread.SpinWait(10000000);
        }
        Console.WriteLine("結束執行");
    }
}
  1. 取消正在阻塞或者執行的執行緒
/// <summary>
/// 使用 WaitHandle.WaitAny 取消被阻塞的執行緒,或者超時取消,或者使用 CancellationToken 協助式取消
/// </summary>
public void Test3()
{
    var pool = new Semaphore(0, 1);
    var cts = new CancellationTokenSource();
    
    Task.Run(() =>
    {
        Console.WriteLine("按下 c 呼叫 CancellationTokenSource.Cancel() 取消執行緒,或者按下 v 呼叫 Semaphore.Release() 取消執行緒,或者五秒後取消");

        switch (Console.ReadKey().Key)
        {
            case ConsoleKey.C:
                cts.Cancel();
                break;
            case ConsoleKey.V:
                pool.Release();
                break;
        }

        if (Console.ReadKey().Key == ConsoleKey.C)
        {
            cts.Cancel();
        }
    });

    var t = new Thread(DoWork);
    t.Start();

    void DoWork()
    {
        var signalIndex = WaitHandle.WaitAny(new WaitHandle[] { pool, cts.Token.WaitHandle }, 5000);

        if (signalIndex == 0)
        {
            Console.WriteLine("呼叫 Semaphore.Release() 取消執行緒");
        }
        else if (cts.Token.IsCancellationRequested)
        {
            Console.WriteLine("CancellationTokenSource.Cancel() 取消執行緒");
        }
        else if (signalIndex == WaitHandle.WaitTimeout)
        {
            Console.WriteLine("超時取消");
        }
        Console.WriteLine("結束執行");
    }
}

執行緒異常和執行緒返回值

當呼叫 Thread.Abort() 或者 Thread.Interrupt() 就會丟擲異常,執行緒執行的程式碼也會丟擲異常,所以執行緒出現異常是很常見的。
當直接新建執行緒並執行,或者呼叫 ThreadPool.QueueUserWorkItem() 使用執行緒池執行緒執行程式碼,出現未捕獲的異常時,會導致程式崩潰。
線上程中執行方法,是無法直接知道方法是否執行完畢,或者得到返回值的。
避免未捕獲異常導致程式崩潰或者得到在其他執行緒執行方法的返回值,所以可以使用 Task.Run() 來執行程式碼,Task 已經處理了未捕獲異常,也可以直接得到返回值。
也可以使用委託包裝一下執行緒執行的程式碼,變成一個能安全執行的程式碼。

internal class ThreadExceptionTest
{
    public async void Test()
    {
        ThreadPool.QueueUserWorkItem(_ => ThreadThrowException()); // 未捕獲異常導致程式崩潰
        
        var t = new Thread(_ => ThreadThrowException()); // 未捕獲異常導致程式崩潰
        t.IsBackground = true;
        t.Start();

        var _ = Task.Run(ThreadThrowException); // 未捕獲異常也不會導致程式崩潰
        string? r = null;
        Exception? e = null;

        var t2 = new Thread(_ => SafeExecute(ThreadReturnValue, out r, out e)); // 通過委託獲取返回值
        t2.Start();
        t2.Join();
        Console.WriteLine(r);

        var t3 = new Thread(_ => SafeExecute(ThreadThrowException, out r, out e)); // 通過委託處理異常
        t3.Start();
        t3.Join();
        Console.WriteLine(e);

        Console.WriteLine(await SafeExecute(ThreadReturnValue)); // 通過委託獲取返回值

        try
        {
            await SafeExecute(ThreadThrowException); // 通過委託處理異常
        }
        catch (Exception exception)
        {
            Console.WriteLine(exception);
        }
    }
    
    public string ThreadThrowException()
    {
        Thread.Sleep(1000);
        throw new Exception("執行緒異常");
    }
    public string ThreadReturnValue()
    {
        Thread.Sleep(1000);
        return "done";
    }
    
    /// <summary>
    /// 捕獲異常,並通過 out 獲取返回值
    /// </summary>
    public void SafeExecute<T>(Func<T> func, out T? r, out Exception? e)
    {
        try
        {
            e = null;
            r = func();
        }
        catch (Exception? exception)
        {
            r = default;
            e = exception;
        }
    }
    
    /// <summary>
    /// 捕獲異常,並通過 TaskCompletionSource 獲取返回值
    /// </summary>
    public Task<T> SafeExecute<T>(Func<T> func)
    {
        var t = new TaskCompletionSource<T>();
        try
        {
            t.TrySetResult(func());
        }
        catch (Exception e)
        {
            t.SetException(e);
        }

        return t.Task;
    }

}

插槽和 ThreadStatic

.Net 提供了兩種執行緒相關變數的方法。

  • 插槽
    Thread.AllocateDataSlot() Thread.AllocateDataSlot() 可以給方法設定一個執行緒插槽,插槽裡面的值是執行緒相關的,也就是每個執行緒特有的,同一個變數不同執行緒無法互相修改。一般在靜態構造方法中初始化。
    Thread.GetData() Thread.SetData() 可以對插槽取值和賦值。
    插槽是動態的,在執行時進行賦值的,而且 Thread.GetData() 返回值是 object,如果執行緒所需的值型別不固定,可以使用插槽。
  • ThreadStaticAttribute
    ThreadStaticAttribute 標記靜態變數時,該變數是執行緒相關的,不同執行緒的靜態變數值是不一樣的。
    [ThreadStatic] IDE 可以提供編譯檢查,效能和安全性更好,如果執行緒所需的值型別是固定的,就應該使用 [ThreadStatic]。

tip: 插槽和 [ThreadStatic] 中的值一般不初始化,因為跟執行緒相關,在哪個執行緒初始化,只有那個執行緒可以看到這個初始化後的值,所以初始化也就沒啥意義了。

internal class ThreadDemo
{
    /// <summary>
    /// 測試 ThreadStaticAttribute
    /// </summary>
    public void Test()
    {
        Parallel.Invoke(StaticThreadDemo.Test, StaticThreadDemo.Test, StaticThreadDemo.Test); // 列印對應執行緒的ID,證明被 [ThreadStatic] 標記過的欄位是執行緒相關的。
    }

    /// <summary>
    /// 測試 LocalDataStoreSlot
    /// </summary>
    public void Test2()
    {
        Parallel.Invoke(StaticThreadDemo.Test2, StaticThreadDemo.Test2, StaticThreadDemo.Test2); // 列印對應執行緒的ID,證明 LocalDataStoreSlot 是執行緒相關的。
    }
}

static class StaticThreadDemo
{
    [ThreadStatic]
    private static int? _threadId = null;

    public static void Test()
    {
        _threadId = Thread.CurrentThread.ManagedThreadId;
        Thread.Sleep(500);
        Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}  ThreadStatic: {_threadId}");
    }

    private static LocalDataStoreSlot _localSlot;

    static StaticThreadDemo()
    {
        _localSlot = Thread.AllocateDataSlot();
        
    }

    public static void Test2()
    {
        Thread.SetData(_localSlot, Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(500);
        Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId} LocalSlot:{Thread.GetData(_localSlot)}");
    }
}

執行緒池操作

執行緒需要維護自己的棧和上下文,新建執行緒是有空間(一個執行緒大概需要 1M 記憶體)和時間(CPU 切換執行緒的時間)上的開銷的,所以一般不會手動新建執行緒並執行程式碼,而是把程式碼交給執行緒池操作,執行緒池會根據電腦的 CPU 核數初始化執行緒數量,根據執行緒忙碌情況新增執行緒。
Task.Run() 最終也是通過執行緒池執行非同步操作的。

讓執行緒池裡的執行緒執行程式碼

ThreadPool.QueueUserWorkItem((state) => { Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}"); });

使用 WaitHandle 控制執行緒池程式碼的執行

ThreadPool.RegisterWaitForSingleObject() 提供了一種方法,傳入一個 WaitHandle 子類或者定時執行執行緒池的程式碼。

internal class ThreadPoolDemo
{
    public void Test()
    {
        var ti = new TaskInfo
        {
            Info = "其他資訊"
        };
        var are = new AutoResetEvent(false);
        var handle = ThreadPool.RegisterWaitForSingleObject(are, DoWork, ti, 2000, false); // 定時 2s 執行
        //var handle = ThreadPool.RegisterWaitForSingleObject(are, DoWork, ti, Timeout.Infinite, false); // 也可以不定時執行
        ti.WaitHandle = handle;

        Thread.Sleep(3000);
        Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 重新 signal AutoResetEvent");
        are.Set();
        Thread.Sleep(2000);
        Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 第二次 signal AutoResetEvent");
        are.Set(); // 呼叫後沒有反應,證明 CallBack 已經被取消註冊



        void DoWork(object? state, bool timeout)
        {
            if (timeout)
            {
                Console.WriteLine("超時");
            }
            else
            {
                var taskInfo = (TaskInfo)state;
                Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} 執行完畢,取消 Callback");

                taskInfo.WaitHandle.Unregister(null); // 取消回撥,不然會回撥會一直迴圈執行,而且應該用 Unregister 來取消,只在建構函式裡面指定 executeOnlyOnce:true 的話,可能會無法 gc 回撥。
            }
        }
    }

    class TaskInfo
    {
        public RegisteredWaitHandle WaitHandle { get; set; }

        public string Info { get; set; }
    }
}

最後

其實日常開發都是用 Task,回顧一下 Thread 可以寫出更加優秀的非同步程式碼,下次回顧一下執行緒同步的知識。
原始碼 https://github.com/yijidao/blog/tree/master/TPL/ThreadDemo/ThreadDemo3