.NET6中的await原理淺析

2023-11-15 12:00:55

前言

看過不少關於 await 的原理的文章,也知道背後是編譯器給轉成了狀態機實現的,但是具體是怎麼完成的,回撥又是如何銜接的,一直都沒有搞清楚,這次下定決心把原始碼自己跑了下,終於豁然開朗了

本文的演示程式碼基於 VS2022 + .NET 6

範例

public class Program
{
    static int Work()
    {
        Console.WriteLine("In Task.Run");
        return 1;
    }

    static async Task TestAsync()
    {
        Console.WriteLine("Before Task.Run");
        await Task.Run(Work);
        Console.WriteLine("After Task.Run");
    }

    static void Main()
    {
        _ = TestAsync();
        Console.WriteLine("End");
        Console.ReadKey();
    }
}
  • 很簡單的非同步程式碼,我們來看下,編譯器把它變成了啥
class Program
{
    static int Work()
    {
        Console.WriteLine("In Task.Run");
        return 1;
    }

    static Task TestAsync()
    {
        var stateMachine = new StateMachine()
        {
            _builder = AsyncTaskMethodBuilder.Create(),
            _state = -1
        };
        stateMachine._builder.Start(ref stateMachine);
        return stateMachine._builder.Task;
    }

    static void Main()
    {
        _ = TestAsync();
        Console.WriteLine("End");
        Console.ReadKey();
    }

    class StateMachine : IAsyncStateMachine
    {
        public int _state;
        public AsyncTaskMethodBuilder _builder;
        private TaskAwaiter<int> _awaiter;

        void IAsyncStateMachine.MoveNext()
        {
            int num = _state;
            try
            {
                TaskAwaiter<int> awaiter;
                if (num != 0)
                {
                    Console.WriteLine("Before Task.Run");
                    awaiter = Task.Run(Work).GetAwaiter();
                    if (!awaiter.IsCompleted)
                    {
                        _state = 0;
                        _awaiter = awaiter;
                        StateMachine stateMachine = this;
                        _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine);
                        return;
                    }
                }
                else
                {
                    awaiter = _awaiter;
                    _awaiter = default;
                    _state = -1;
                }
                awaiter.GetResult();
                Console.WriteLine("After Task.Run");
            }
            catch (Exception exception)
            {
                _state = -2;
                _builder.SetException(exception);
                return;
            }
            _state = -2;
            _builder.SetResult();
        }

        void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine) { }
    }
}
  • 編譯後的程式碼經過我的整理,命名簡化了,更容易理解

狀態機實現

  • 我們看到實際是生成了一個隱藏的狀態機類 StateMachine

  • 把狀態機的初始狀態 _state 設定 -1

  • stateMachine._builder.Start(ref stateMachine); 啟動狀態機,內部實際呼叫的就是狀態機的 MoveNext 方法

  • Task.Run 建立一個任務, 把委託放在 Task.m_action 欄位,丟到執行緒池,等待排程

  • 任務線上程池內被排程完成後,是怎麼回到這個狀態機繼續執行後續程式碼的呢?
    _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine); 就是關鍵了, 跟下去,到了如下的程式碼:

    if (!this.AddTaskContinuation(stateMachineBox, false))
    {
        ThreadPool.UnsafeQueueUserWorkItemInternal(stateMachineBox, true);
    }
    bool AddTaskContinuation(object tc, bool addBeforeOthers)
    {
        return !this.IsCompleted && ((this.m_continuationObject == null && Interlocked.CompareExchange(ref this.m_continuationObject, tc, null) == null) || this.AddTaskContinuationComplex(tc, addBeforeOthers));
    }
    
    • 這裡很清楚的看到,嘗試把狀態機物件(實際是狀態機的包裝類),賦值到 Task.m_continuationObject, 如果操作失敗,則把狀態機物件丟進執行緒池等待排程,這裡為什麼這麼實現,看一下執行緒池是怎麼執行的就清楚了

執行緒池實現

  • .NET6 的執行緒池實現,實際是放到了 PortableThreadPool, 具體偵錯步驟我就不放了,直接說結果就是, 執行緒池執行緒從任務佇列中拿到任務後都執行了 DispatchWorkItem 方法
static void DispatchWorkItem(object workItem, Thread currentThread)
{
    Task task = workItem as Task;
    if (task != null)
    {
        task.ExecuteFromThreadPool(currentThread);
        return;
    }
    Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
}
virtual void ExecuteFromThreadPool(Thread threadPoolThread)
{
    this.ExecuteEntryUnsafe(threadPoolThread);
}
  • 我們看到, 執行緒池佇列中的任務都是 object 型別的, 這裡進行了型別判斷, 如果是 Task , 直接執行 task.ExecuteFromThreadPool, 更有意思的這個方法是個虛方法,後面說明

  • ExecuteFromThreadPool 繼續追下去,我們來到了這裡,程式碼做了簡化

    private void ExecuteWithThreadLocal(ref Task currentTaskSlot, Thread threadPoolThread = null)
    {
        this.InnerInvoke();
        this.Finish(true);
    }
    
    virtual void InnerInvoke()
    {
        Action action = this.m_action as Action;
        if (action != null)
        {
            action();
            return;
        }
    }
    
  • 很明顯 this.InnerInvoke 就是執行了最開始 Task.Run(Work) 封裝的委託了, 在 m_action 欄位

  • this.Finish(true); 跟下去會發現會呼叫 FinishStageTwo 設定任務的完成狀態,異常等, 繼續呼叫 FinishStageThree 就來了重點: FinishContinuations 這個方法就是銜接後續回撥的核心

    internal void FinishContinuations()
    {
        object obj = Interlocked.Exchange(ref this.m_continuationObject, Task.s_taskCompletionSentinel);
        if (obj != null)
        {
            this.RunContinuations(obj);
        }
    }
    
  • 還記得狀態機實現麼, Task.m_continuationObject 欄位實際儲存的就是狀態機的包裝類,這裡執行緒池執行緒也會判斷這個欄位有值的話,就直接使用它執行後續程式碼了

    void RunContinuations(object continuationObject)
    {
        var asyncStateMachineBox = continuationObject as IAsyncStateMachineBox;
        if (asyncStateMachineBox != null)
        {
            AwaitTaskContinuation.RunOrScheduleAction(asyncStateMachineBox, flag2);
            return;
        }
    }
    
    static void RunOrScheduleAction(IAsyncStateMachineBox box, bool allowInlining)
    {
        if (allowInlining && AwaitTaskContinuation.IsValidLocationForInlining)
        {
            box.MoveNext();
            return;
        }
    }
    

總結

  1. Task.Run 建立 Task, 把委託放在 m_action 欄位, 把 Task 壓入執行緒池佇列,等待排程
  2. _builder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine); 嘗試把狀態機物件放在 Task.m_continuationObject 欄位上,等待執行緒池執行緒排程完成任務後使用(用來執行後續),若操作失敗,直接把狀態機物件壓入執行緒池佇列,等待排程
  3. 執行緒池執行緒排程任務完成後,會判斷 Task.m_continuationObject 有值,直接執行它的 MoveNext

備註

  1. 狀態機實現中,嘗試修改 Task.m_continuationObject,可能會失敗,
    就會直接把狀態機物件壓入執行緒池, 但是執行緒池排程,不都是判斷是不是 Task 型別麼, 其實狀態機的包裝類是 Task 的子類,哈哈,是不是明白了

    class AsyncStateMachineBox<TStateMachine> : Task<TResult>, IAsyncStateMachineBox where TStateMachine : IAsyncStateMachine
    
    static void DispatchWorkItem(object workItem, Thread currentThread)
    {
        Task task = workItem as Task;
        if (task != null)
        {
            task.ExecuteFromThreadPool(currentThread);
            return;
        }
        Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
    }
    
  • 還有就是狀態機包裝類,重寫了 Task.ExecuteFromThreadPool,所以執行緒池呼叫 task.ExecuteFromThreadPool 就是直接呼叫了狀態機的 MoveNext 了, Soga ^_^
    override void ExecuteFromThreadPool(Thread threadPoolThread)
    {
        this.MoveNext(threadPoolThread);
    }
    
參考連結
  • 關於執行緒池和非同步的更深刻的原理,大家可以參考下面的文章

概述 .NET 6 ThreadPool 實現: https://www.cnblogs.com/eventhorizon/p/15316955.html

.NET Task 揭祕(2):Task 的回撥執行與 await: https://www.cnblogs.com/eventhorizon/p/15912383.html