關於 Task 簡單梳理(C#)【並行程式設計系列】

2023-07-18 18:00:28

〇、前言

Task 是微軟在 .Net 4.0 時代推出來的,也是微軟極力推薦的一種多執行緒的處理方式。

在 Task 之前有一個高效多執行緒操作累 ThreadPool,雖然執行緒池相對於 Thread,具有很多優勢避免頻繁建立和銷燬執行緒等,但是執行緒池也有一些使用上的不便,比如不支援取消、完成、失敗通知等,也不支援執行緒執行的先後順序設定。

為了解決上述痛點,Task 誕生了。Task 就是站在巨人的肩膀上而生,它是基於 ThreadPool 封裝。Task 的控制和擴充套件性很強,線上程的延續、阻塞、取消、超時等方面遠勝於 ThreadPool。

本文將對 Task 進行一個詳細的介紹。

一、任務如何建立和啟動?

建立任務執行任務是可以分離的,也可以同時進行。如下程式碼有四種開啟任務的方式:

  • 第一種:任務 t1 通過呼叫 Task 類建構函式進行範例化,但僅在任務 t2 啟動後呼叫其 Start() 方法啟動。【建立+未啟動】
  • 第二種:任務 t2 通過呼叫 TaskFactory.StartNew(Action<Object>, Object) 方法在單個方法呼叫中範例化和啟動。【建立+啟動】
  • 第三種:任務 t3 通過呼叫 Run(Action) 方法在單個方法呼叫中範例化和啟動。【建立+啟動】
  • 第四種:任務 t4 通過呼叫 RunSynchronously() 方法在主執行緒上同步執行。【建立+未啟動】
static void Main(string[] args)
{
    // 用於非同步呼叫的委託函數,接受型別為 Object 的引數
    Action<object> action = (object obj) =>
    {
        // Task.CurrentId :任務 ID
        // Thread.CurrentThread.ManagedThreadId :執行緒 ID
        Console.WriteLine($"Task={Task.CurrentId}, obj={obj}, Thread={Thread.CurrentThread.ManagedThreadId}");
        // throw new Exception();
    };

    // 【第一種】建立一個就緒,但【未啟動】的任務,需要在後文通過 t1.Start() 啟動
    Task t1 = new Task(action, "甲"); // alpha:初始

    // 【第二種】【建立並啟動】一個任務
    Task t2 = Task.Factory.StartNew(action, "乙");
    // 佔用主執行緒,等待任務 t2 完成
    t2.Wait();

    // 啟動第一個任務 t1
    t1.Start();
    Console.WriteLine($"t1 已啟動  (主執行緒 = {Thread.CurrentThread.ManagedThreadId})");
    // 通過 Wait() 佔用主執行緒,等待 t1 執行完畢
    t1.Wait();

    // 【第三種】通過 Task.Run() 【建立並啟動】一個任務
    string taskData = "丙";
    Task t3 = Task.Run(() =>
    {
        Console.WriteLine($"Task={Task.CurrentId}, obj={taskData}, Thread={Thread.CurrentThread.ManagedThreadId}");
    });
    // 通過 Wait() 佔用主執行緒,等待 t3 執行完畢
    t3.Wait();

    // 【第四種】建立一個就緒,但【未啟動】的任務 t4
    Task t4 = new Task(action, "丁");
    // Synchronously:同步的
    // 開啟同步任務 t4,在主執行緒上執行
    t4.RunSynchronously();
    // t4 是以同步的方式執行的,此時的 Wait() 可以捕捉到異常
    t4.Wait();
    Console.ReadLine();
}

如下圖輸出結果,最先開啟的 t2,由於是工廠中啟動的,所以不佔用主執行緒執行。Task.Run() 同樣是非主執行緒執行,但它並未新開執行緒,而是直接用了 t2 執行的執行緒。

執行緒編號為 1 的是主執行緒,t1 是主執行緒最先建立的,所以直接由主執行緒執行。t4 是在同步執行的任務,因此也是主執行緒來執行。

  

二、等待一個或多個任務

用於等待任務的方法有很多個,如下:

Wait() task1.Wait() 單執行緒等待
WaitAll() Task.WaitAll(tasks) 等待任務集合 tasks 中的全部任務完成
WaitAny()  int index = Task.WaitAny(tasks) 等待任一任務完成,並返回這一任務的編號
WhenAll() Task t = Task.WhenAll(tasks) 返回一個新的任務,這個任務的完成狀態在【tasks 集合中全部任務都完成時】完成
WhenAny() Task t = Task.WhenAny(tasks) 返回在任務集合 tasks 中第一個執行完成的任務物件

下面幾個範例來實操下。

2.1 Wait()

對於 Wait() 單執行緒等待,沒啥好說的,看程式碼:

static void Main(string[] args)
{
    // 建立並執行一個任務執行匿名函數
    Task taskA = Task.Run(() => Thread.Sleep(2000));
    Console.WriteLine($"taskA Status: {taskA.Status}"); // taskA Status: WaitingToRun
    try
    {
        taskA.Wait(1000); // 主執行緒等待任務 1s 此時任務尚未完成
        Console.WriteLine($"taskA Status: {taskA.Status}"); // taskA Status: Running
        taskA.Wait(); // 執行緒等待任務 taskA 完成
        Console.WriteLine($"taskA Status: {taskA.Status}"); // taskA Status: RanToCompletion
    }
    catch (AggregateException)
    {
        Console.WriteLine("Exception in taskA.");
    }
}

2.2 Wait(Int32, CancellationToken)  支援手動取消

關於 Wait(Int32, CancellationToken) 任務可手動取消的過載。在任務完成之前,超時或呼叫了 Cancel() 方法,等待終止。

如下範例,一個執行緒一個任務,執行緒中將 CabcellationTokenSource 的範例 cts 取消掉,導致後續任務等待時呼叫 cts.Token 導致異常 OperationCanceledException 的發生。

static void Main(string[] args)
{
    CancellationTokenSource cts = new CancellationTokenSource();
    Thread thread = new Thread(CancelToken); // 新開一個執行緒執行方法:CancelToken()
    thread.Start(cts);

    Task t = Task.Run(() => // 新增一個任務執行匿名函數
    {
        Task.Delay(5000).Wait(); // 延遲等待 5s
        Console.WriteLine("Task ended delay...");
    });
    try
    {
        Console.WriteLine($"About to wait completion of task {t.Id}"); // 以上兩個操作都有延遲,所以此處訊息先列印
        // 等待任務 t 1.51s,保證執行緒已執行完成,就是保證 CancellationTokenSource 已執行過取消操作
        // 由於 cts 已經取消,因此次數就拋異常:OperationCanceledException
        bool result = t.Wait(1510, cts.Token); // 後邊程式碼就不再執行,直接跳到 catch
        Console.WriteLine($"Wait completed normally: {result}");
        Console.WriteLine($"The task status:  {t.Status}");
    }
    catch (OperationCanceledException e)
    {
        Console.WriteLine($"{e.GetType().Name}: The wait has been canceled.");
        Console.WriteLine($"Task status:{t.Status}"); // 此時程式執行 1.5s 多,任務 t 還在等待,因此狀態是 Running
        Thread.Sleep(4000); // 4s + 1.5s > 5s 此時任務 t 已經執行完成,狀態為 RanToCompletion 
        Console.WriteLine("After sleeping, the task status:  {t.Status}");
        cts.Dispose();
    }
    Console.ReadLine();
}

private static void CancelToken(Object obj)
{
    Thread.Sleep(1500); // 延遲 1.5s
    Console.WriteLine($"Canceling the cancellation token from thread {Thread.CurrentThread.ManagedThreadId}...");
    CancellationTokenSource source = obj as CancellationTokenSource;
    if (source != null) 
        source.Cancel(); // 將 CancellationTokenSource 的範例執行取消
}

  

2.3 WaitAll()

 等待一組任務全部完成,無論是否拋異常。AggregateException 將會收集全部異常資訊,可以通過遍歷獲取每一個異常詳情。

如下程式碼,新建是個任務組成任務組 tasks,其中 2~5 執行緒手動拋異常,最後通過遍歷 AggregateException aex 記錄全部異常。

static void Main(string[] args)
{
    var tasks = new List<Task<int>>();
    // 建立一個委託,用於任務執行,並記錄每個任務資訊
    Func<object, int> action = (object obj) =>
    {
        int i = (int)obj;
        // 讓每次的 TickCount 不同(系統開始執行的毫秒數)
        Thread.Sleep(i * 1000);
        if (2 <= i && i <= 5) // 從第 2 到 5 個任務都拋異常
        {
            throw new InvalidOperationException("SIMULATED EXCEPTION");
        }
        int tickCount = Environment.TickCount; // 獲取系統開始執行的毫秒數
        Console.WriteLine($"Task={Task.CurrentId}, i={i}, TickCount={tickCount}, Thread={Thread.CurrentThread.ManagedThreadId}");
        return tickCount;
    };
    // 連續建立 10 個任務
    for (int i = 0; i < 10; i++)
    {
        int index = i;
        tasks.Add(Task<int>.Factory.StartNew(action, index)); // 後臺執行緒
    }
    try
    {
        // WaitAll() 等待全部任務完成
        Task.WaitAll(tasks.ToArray());
        // 由於執行緒中手動丟擲了異常,因此這個訊息將無法列印在控制檯
        Console.WriteLine("WaitAll() has not thrown exceptions. THIS WAS NOT EXPECTED.");
    }
    catch (AggregateException aex) // AggregateException 異常中包含 2~5 四個異常
    {
        Console.WriteLine("\nThe following exceptions have been thrown by WaitAll(): (THIS WAS EXPECTED)");
        Console.WriteLine($"\ne.InnerExceptions.Count:{aex.InnerExceptions.Count}");
        for (int j = 0; j < aex.InnerExceptions.Count; j++) // aex.InnerExceptions.Count == 4
        {
            Console.WriteLine("\n-------------------------------------------------\n{0}", aex.InnerExceptions[j].ToString());
        }
    }
    Console.ReadLine();
}

   

2.4 WaitAny()

 等待一組任務中的任一任務完成,然後返回第一個執行完成任務的序號,可通過tasks[index].Id取得任務 ID。

如下範例,每個任務都有延遲,當第一個任務完成時,遍歷列印出其他全部任務的狀態:

static void Main(string[] args)
{
    Task[] tasks = new Task[5];
    for (int ctr = 0; ctr <= 4; ctr++)
    {
        int factor = ctr; // 重新宣告一個變數
        tasks[ctr] = Task.Run(() => Thread.Sleep(factor * 250 + 50));
    }
    int index = Task.WaitAny(tasks); // 等待任一任務結束
    Console.WriteLine($"任務 #{tasks[index].Id} 已完成。");
    Console.WriteLine("\n當前各個任務的狀態:");
    foreach (var t in tasks)
        Console.WriteLine($"   Task {t.Id}: {t.Status}");
    Console.ReadLine();
}

   

參考:https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.task.wait?view=net-7.0

三、延續任務 Task.ContinueWith()

3.1 一個簡單的範例

如下程式碼,首先建立一個耗時的任務 task 並啟動,此時也不影響主執行緒的執行。然後通過task.ContinueWith()在第一個任務執行完成後,執行其中的匿名函數。

static void Main(string[] args)
{
    // 建立一個任務
    Task<int> task = new Task<int>(() =>
    {
        int sum = 0;
        Console.WriteLine($"使用 Task 執行非同步操作,當前執行緒 {Thread.CurrentThread.ManagedThreadId}");
        Thread.Sleep(2000);
        for (int i = 0; i < 100; i++)
        {
            sum += i;
        }
        return sum;
    });
    // 啟動任務
    task.Start();
    // 主執行緒在此處可以執行其他處理
    Console.WriteLine($"1 主執行緒 {Thread.CurrentThread.ManagedThreadId}");
    Thread.Sleep(1000);

    //任務完成時執行處理。
    Task cwt = task.ContinueWith(t =>
    {
        Console.WriteLine($"任務完成後的執行結果:{t.Result} 當前執行緒 {Thread.CurrentThread.ManagedThreadId}");
    });
    task.Wait();
    cwt.Wait();
    Console.WriteLine($"2 主執行緒 {Thread.CurrentThread.ManagedThreadId}");
    Console.ReadLine();
}

   

詳情可參考:https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.task.continuewith?view=net-7.0

3.2 任務的並行與序列

ContinueWith、WaitAll 當這兩者結合起來,我們就可以處理複雜一點的東西。比如,現在有 7 個任務,其中 t1 需要序列,t2-t3 可以並行,t4 需要序列,t5-t6 並行,t7 序列。邏輯如下圖:

  

public static void Main(string[] args)
{
    ConcurrentStack<int> stack = new ConcurrentStack<int>(); // ConcurrentStack:執行緒安全的後進先出(LIFO:LastIn-FirstOut)集合
    ConcurrentBag<int> bag = new ConcurrentBag<int>(); // ConcurrentBag:執行緒安全的無序集合
    // t1先序列
    var t1 = Task.Factory.StartNew(() =>
    {
        stack.Push(1);
        stack.Push(2);
    });

    // t1.ContinueWith() t1 之後,t2、t3並行執行
    var t2 = t1.ContinueWith(t =>
    {
        int result;
        stack.TryPop(out result);
    });
    // t2,t3並行執行
    var t3 = t1.ContinueWith(t =>
    {
        int result;
        stack.TryPop(out result);
    });
    // 等待 t2、t3 執行完
    Task.WaitAll(t2, t3);

    //t4序列執行
    var t4 = Task.Factory.StartNew(() =>
    {
        stack.Push(1);
        stack.Push(2);
    });

    // t5、t6 並行執行
    var t5 = t4.ContinueWith(t =>
    {
        int result;
        stack.TryPop(out result);
    });
    // t5、t6 並行執行
    var t6 = t4.ContinueWith(t =>
    {
        int result;
        // 只彈出,不移除
        stack.TryPeek(out result);
    });
    // 臨界區:等待 t5、t6 執行完
    Task.WaitAll(t5, t6);

    // t7 序列執行
    var t7 = Task.Factory.StartNew(() =>
    {
        Console.WriteLine($"當前集合元素個數:{stack.Count}"); // 當前集合元素個數:1
    });
    Console.ReadLine();
}

參考: https://www.cnblogs.com/huangxincheng/archive/2012/04/03/2430638.html

https://learn.microsoft.com/zh-cn/dotnet/api/system.threading.tasks.task?view=net-7.0

https://www.cnblogs.com/zhaoshujie/p/11082753.html