long-running task 是指那些長時間執行的任務,比如在一個 while True 中執行耗時較長的同步處理。
下面的例子中,我們不斷從佇列中嘗試取出資料,並對這些資料進行處理,這樣的任務就適合交給一個 long-running task 來處理。
var queue = new BlockingCollection<string>();
Task.Factory.StartNew(() =>
{
while (true)
{
// BlockingCollection<T>.Take() 方法會阻塞當前執行緒,直到佇列中有資料可以取出。
var input = queue.Take();
Console.WriteLine($"You entered: {input}");
}
}, TaskCreationOptions.LongRunning);
while (true)
{
var input = Console.ReadLine();
queue.Add(input);
}
在 .NET 中,我們可以使用 Task.Factory.StartNew 方法並傳入 TaskCreationOptions.LongRunning 來建立一個 long-running task。
雖然這種方式建立的 long-running task 和預設建立的 task 一樣,都是分配給 ThreadPoolTaskScheduler 來排程的, 但 long-running task 會被分配到一個新的 Background 執行緒上執行,而不是交給 ThreadPool 中的執行緒來執行。
class ThreadPoolTaskScheduler : TaskScheduler
{
// ...
protected internal override void QueueTask(Task task)
{
TaskCreationOptions options = task.Options;
if (Thread.IsThreadStartSupported && (options & TaskCreationOptions.LongRunning) != 0)
{
// 在一個新的 Background 執行緒上執行 long-running task。
new Thread(s_longRunningThreadWork)
{
IsBackground = true,
Name = ".NET Long Running Task"
}.UnsafeStart(task);
}
else
{
// 非 long-running task 交給 ThreadPool 中的執行緒來執行。
ThreadPool.UnsafeQueueUserWorkItemInternal(task, (options & TaskCreationOptions.PreferFairness) == 0);
}
}
// ...
}
如果一個task持續佔用一個執行緒,那麼這個執行緒就不能被其他的task使用,這和 ThreadPool 的設計初衷是相違背的。
如果在 ThreadPool 中建立了大量的 long-running task,那麼就會導致
ThreadPool 中的執行緒不夠用,從而影響到其他的 task 的執行。
有時候,我們需要在 long-running task 中呼叫一個 async 方法。比如下面的例子中,我們需要在 long-running task 中呼叫一個 async
的方法來處理資料。
var queue = new BlockingCollection<string>();
Task.Factory.StartNew(async () =>
{
while (true)
{
var input = queue.Take();
Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
await ProcessAsync(input);
Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
}
}, TaskCreationOptions.LongRunning);
async Task ProcessAsync(string input)
{
// 模擬一個非同步操作。
await Task.Delay(100);
Console.WriteLine($"You entered: {input}, thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
}
while (true)
{
var input = Console.ReadLine();
queue.Add(input);
}
TaskScheduler InternalCurrentTaskScheduler()
{
var propertyInfo = typeof(TaskScheduler).GetProperty("InternalCurrent", BindingFlags.Static | BindingFlags.NonPublic);
return (TaskScheduler)propertyInfo.GetValue(null);
}
連續輸入 1、2、3、4,輸出如下:
1
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 1, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
2
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 2, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
3
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 3, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
4
Before process: thread id: 4, task scheduler: , thread pool: True
You entered: 4, thread id: 4, task scheduler: , thread pool: True
After process: thread id: 4, task scheduler: , thread pool: True
從執行結果中可以看出,第一次 await 之前,當前執行緒是 long-running task 所在的執行緒(thread id: 9),此後就變成了 ThreadPool
中的執行緒(thread id: 4)。
至於為什麼之後一直是 ThreadPool 中的執行緒(thread id: 4),這邊做一下簡單的解釋。在我以前一篇介紹 await 的文章中介紹了 await 的執行過程,以及 await 之後的程式碼會在哪個執行緒上執行。
https://www.cnblogs.com/eventhorizon/p/15912383.html
執行緒池的介紹請參考我另一篇部落格
https://www.cnblogs.com/eventhorizon/p/15316955.html
回到本文的主題,如果在 long-running task 使用了 await 呼叫一個 async 方法,就會導致為 long-running task 分配的獨立執行緒提前退出,和我們的預期不符。
在 long-running task 中呼叫一個 async 方法,可以使用 Task.Wait 來阻塞當前執行緒,直到 async 方法執行完畢。
對於 Task.Factory.StartNew 建立出來的 long-running task 來說,因為其繫結了 ThreadPoolTaskScheduler,就算是使用 Task.Wait
阻塞了當前執行緒,也不會導致死鎖。
並且 Task.Wait 會把異常丟擲來,所以我們可以在 catch 中處理異常。
// ...
Task.Factory.StartNew( () =>
{
while (true)
{
var input = queue.Take();
Console.WriteLine($"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
ProcessAsync(input).Wait();
Console.WriteLine($"After process: thread id: {Thread.CurrentThread.ManagedThreadId}");
}
}, TaskCreationOptions.LongRunning);
// ...
輸出如下:
1
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 1, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
2
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 2, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
3
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 3, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
4
Before process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
You entered: 4, thread id: 5, task scheduler: , thread pool: True
After process: thread id: 9, task scheduler: System.Threading.Tasks.ThreadPoolTaskScheduler, thread pool: False
Task.Wait 並不會對 async 方法內部產生影響,所以 async 方法內部的程式碼還是按照正常的邏輯執行。這邊 ProcessAsync 方法內部列印的
thread id 沒變純粹是因為 ThreadPool 目前就只建立了一個執行緒,你可以瘋狂輸入看看結果。
關於 Task.Wait 的使用,可以參考我另一篇部落格
https://www.cnblogs.com/eventhorizon/p/17481757.html
Task.Factory.StartNew(async () =>
{
while (true)
{
var input = queue.Take();
Console.WriteLine(
$"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
await ProcessAsync(input);
Console.WriteLine(
$"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
}
}, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());
class CustomerTaskScheduler : TaskScheduler
{
// 這邊的 BlockingCollection 只是舉個例子,如果是普通的佇列,配合鎖也是可以的。
private readonly BlockingCollection<Task> _tasks = new BlockingCollection<Task>();
public CustomerTaskScheduler()
{
var thread = new Thread(() =>
{
foreach (var task in _tasks.GetConsumingEnumerable())
{
TryExecuteTask(task);
}
})
{
IsBackground = true
};
thread.Start();
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks;
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
return false;
}
}
輸出如下:
1
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 1, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
2
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 2, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
3
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 3, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
4
Before process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
You entered: 4, thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
After process: thread id: 9, task scheduler: CustomerTaskScheduler, thread pool: False
因為修改了上下文繫結的 TaskScheduler,會影響到 async 方法內部 await 回撥的執行。
這種做法不推薦使用,因為可能會導致死鎖。
如果我將 await 改成 Task.Wait,就會導致死鎖。
Task.Factory.StartNew(() =>
{
while (true)
{
var input = queue.Take();
Console.WriteLine(
$"Before process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
ProcessAsync(input).Wait();
Console.WriteLine(
$"After process: thread id: {Thread.CurrentThread.ManagedThreadId}, task scheduler: {InternalCurrentTaskScheduler()}, thread pool: {Thread.CurrentThread.IsThreadPoolThread}");
}
}, CancellationToken.None, TaskCreationOptions.None, new CustomerTaskScheduler());
輸出如下:
1
Before process: thread id: 7, task scheduler: CustomerTaskScheduler, thread pool: False
後面就沒有輸出了,因為死鎖了,除非我們在 ProcessAsync 方法內部每個 await 的 Task 後加上ConfigureAwait(false)。
同理,同學們也可以嘗試用 SynchronizationContext 來實現類似的效果,同樣有死鎖的風險。
如果你想要在一個 long-running task 中執行 async 方法,使用 await 關鍵字會導致 long-running task 的獨立執行緒提前退出。
比較推薦的做法是使用 Task.Wait。如果連續執行多個 async 方法,建議將這些 async 方法封裝成一個新方法,然後只 Wait 這個新方法的 Task。