我沒能實現始終在一個執行緒上執行 task

2023-04-03 09:00:12

前文我們總結了在使用常駐任務實現常駐執行緒時,應該注意的事項。但是我們最終沒有提到如何在處理對於帶有非同步程式碼的辦法。本篇將接受筆者對於該內容的總結。

如何識別當前程式碼跑在什麼執行緒上

一切開始之前,我們先來使用一種簡單的方式來識別當前程式碼執行在哪種執行緒上。

最簡單的方式就是列印當前執行緒名稱和執行緒ID來識別。

private static void ShowCurrentThread(string work)
{
    Console.WriteLine($"{work} - {Thread.CurrentThread.Name} - {Thread.CurrentThread.ManagedThreadId}");
}

通過這段程式碼,我們可以非常容易的識別三種不同情況下的執行緒資訊。

[Test]
public void ShowThreadMessage()
{
    new Thread(() => { ShowCurrentThread("Custom thread work"); })
    {
        IsBackground = true,
        Name = "Custom thread"
    }.Start();

    Task.Run(() => { ShowCurrentThread("Task.Run work"); });
    Task.Factory.StartNew(() => { ShowCurrentThread("Task.Factory.StartNew work"); },
        TaskCreationOptions.LongRunning);

    Thread.Sleep(TimeSpan.FromSeconds(1));
}
// output
// Task.Factory.StartNew work - .NET Long Running Task - 17
// Custom thread work - Custom thread - 16
// Task.Run work - .NET ThreadPool Worker - 12

分別為:

  • 自定義執行緒 Custom thread
  • 執行緒池執行緒 .NET ThreadPool Worker
  • 由 Task.Factory.StartNew 建立的新執行緒 .NET Long Running Task

因此,結合我們之前曇花執行緒的例子,我們也可以非常簡單的看出執行緒的切換情況:

[Test]
public void ShortThread()
{
    new Thread(async () =>
    {
        ShowCurrentThread("before await");
        await Task.Delay(TimeSpan.FromSeconds(0.5));
        ShowCurrentThread("after await");
    })
    {
        IsBackground = true,
        Name = "Custom thread"
    }.Start();
    Thread.Sleep(TimeSpan.FromSeconds(1));
}
// output
// before await - Custom thread - 16
// after await - .NET ThreadPool Worker - 6

我們希望在同一個執行緒上執行 Task 程式碼

之前我們已經知道了,手動建立執行緒並控制執行緒的執行,可以確保自己的程式碼不會於執行緒池執行緒產生競爭,從而使得我們的常駐任務能夠穩定的觸發。

當時用於演示的錯誤範例是這樣的:

[Test]
public void ThreadWaitTask()
{
    new Thread(async () =>
    {
        ShowCurrentThread("before await");
        Task.Run(() =>
        {
            ShowCurrentThread("inner task");
        }).Wait();
        ShowCurrentThread("after await");
    })
    {
        IsBackground = true,
        Name = "Custom thread"
    }.Start();
    Thread.Sleep(TimeSpan.FromSeconds(1));
}
// output
// before await - Custom thread - 16
// inner task - .NET ThreadPool Worker - 13
// after await - Custom thread - 16

這個範例可以明顯的看出,中間的部分程式碼是執行線上程池的。這種做法會線上程池資源緊張的時候,導致我們的常駐任務無法觸發。

因此,我們需要一種方式來確保我們的程式碼在同一個執行緒上執行。

那麼接下來我們分析一些想法和效果。

加配!加配!加配!

我們已經知道了,實際上,常駐任務不能穩定觸發是因為 Task 會線上程池中執行。那麼增加執行緒池的容量自然就是最直接解決高峰的做法。 因此,如果條件允許的話,直接增加 CPU 核心數實際上是最為有效和簡單的方式。

不過這種做法並不適用於一些類庫的編寫者。比如,你在編寫紀錄檔類庫,那麼其實無法欲知使用者所處的環境。並且正如大家所見,市面上幾乎沒有紀錄檔類庫中由說明讓使用者只能在一定的 CPU 核心數下使用。

因此,如果您的常駐任務是在類庫中,那麼我們需要一種更為通用的方式來解決這個問題。

考慮使用同步過載

在 Task 出現之後,很多時候我們都會考慮使用非同步過載的方法。這顯然不是錯誤的做法,因為這可以使得我們的程式碼更加高效,提升系統的吞吐量。但是,如果你想要讓 Thread 穩定的在同一個執行緒上執行,那麼你需要考慮使用同步過載的方法。通過同步過載方法,我們的程式碼將不會出現執行緒切換到執行緒池的情況。自然也就實現了我們的目的。

總是使用 TaskCreationOptions.LongRunning

這個辦法其實很不實際。因為任何一層沒有指定,都會將工作切換到執行緒池中。

[Test]
public void AlwaysLogRunning()
{
    new Thread(async () =>
    {
        ShowCurrentThread("before await");
        Task.Factory.StartNew(() =>
        {
            ShowCurrentThread("LongRunning task");
            Task.Run(() => { ShowCurrentThread("inner task"); }).Wait();
        }, TaskCreationOptions.LongRunning).Wait();
        ShowCurrentThread("after await");
    })
    {
        IsBackground = true,
        Name = "Custom thread"
    }.Start();
    Thread.Sleep(TimeSpan.FromSeconds(1));
}
// output
// before await - Custom thread - 16
// LongRunning task - .NET Long Running Task - 17
// inner task - .NET ThreadPool Worker - 7
// after await - Custom thread - 16

所以說,這個辦法可以用。但其實很怪。

自定義 Scheduler

這是一種可行,但是非常困難的做法。雖然說自定義個簡單的 Scheduler 也不是很難,只需要實現幾個簡單的方法。但要按照我們的需求來實現這個 Scheduler 並不簡單。

比如我們嘗試實現一個這樣的 Scheduler:

注意:這個 Scheduler 並不能正常工作。

class MyScheduler : TaskScheduler
{
    private readonly Thread _thread;
    private readonly ConcurrentQueue<Task> _tasks = new();

    public MyScheduler()
    {
        _thread = new Thread(() =>
        {
            while (true)
            {
                while (_tasks.TryDequeue(out var task))
                {
                    TryExecuteTask(task);
                }
            }
        })
        {
            IsBackground = true,
            Name = "MyScheduler"
        };
        _thread.Start();
    }

    protected override IEnumerable<Task> GetScheduledTasks()
    {
        return _tasks;
    }

    protected override void QueueTask(Task task)
    {
        _tasks.Enqueue(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return false;
    }
}

上面的程式碼中,我們期待通過一個單一的執行緒來執行所有的任務。但實際上它反而是一個非常簡單的死鎖演示裝置。

我們設想執行下面這段程式碼:

[Test]
public async Task TestLongRunningConfigureAwait()
{
    var scheduler = new MyScheduler();
    await Task.Factory.StartNew(() =>
    {
        ShowCurrentThread("BeforeWait");
        Task.Factory
            .StartNew(() =>
                {
                    ShowCurrentThread("AfterWait");
                }
                , CancellationToken.None, TaskCreationOptions.None, scheduler)
            .Wait();
        ShowCurrentThread("AfterWait");
    }, CancellationToken.None, TaskCreationOptions.None, scheduler);
}

這段程式碼中,我們期待,在一個 Task 中執行另外一個 Task。但實際上,這段程式碼會死鎖。

因為,我們的 MyScheduler 中,我們在一個死迴圈中,不斷的從佇列中取出任務並執行。但是,我們的任務中,又會呼叫 Wait 方法。

我們不妨設想這個執行緒就是我們自己。

  1. 首先,老闆交代給你一件任務,你把它放到佇列中。
  2. 然後你開始執行這件任務,執行到一半發現,你需要等待第二件任務的執行結果。因此你在這裡等著。
  3. 但是第二件任務這個時候也塞到了你的佇列中。
  4. 這下好了,你手頭的任務在等待你佇列裡面的任務完成。而你佇列的任務只有你才能完成。
  5. 完美卡死。

因此,其實實際上我們需要在 Wait 的時候通知當前執行緒,此時執行緒被 Block 了,然後轉而從佇列中取出任務執行。在 Task 於 ThreadPool 的配合中,是存在這樣的機制的。但是,我們自己實現的 MyScheduler 並不能與 Task 產生這種配合。因此需要考慮自定義一個 Task。跟進一步說,我們需要自定義 AsyncMethodBuilder 來實現全套的自定義。

顯然者是一項相對高階內容,期待了解的讀者,可以通過 UniTask1 專案來了解如何實現這樣的全套自定義。

總結

如果你期望在常駐執行緒能夠穩定的執行你的任務。那麼:

  1. 加配,以避免執行緒池不夠用
  2. 考慮在這部分程式碼中使用同步程式碼
  3. 可以學習自定義 Task 系統

參考

感謝閱讀,如果覺得本文有用,不妨點選推薦