我們知道我們的task async 和 await 是基於執行緒池進行排程的。
但是async 和 await 也就是使用了預設的task排程,讓其線上程池中執行。
但是執行緒池是榨乾機器效能為本質的,但是有時候我們執行一些我們自己的需求,比如控制一下執行緒數(因為並不是執行緒數越高,就能有更高的效能),控制一下cpu使用,避免cpu使用太高。
首先我們需要一個佇列,因為我們需要讓task進行儲存到某個地方,這裡選擇佇列,因為它簡單,也一般符合我們先進先出(先到先執行)的想法。
public sealed class BlockingQueue<T>
{
private readonly Queue<T> _queue = new Queue<T>();
private readonly object _lock = new object();
private readonly Semaphore _pool= new Semaphore(0, int.MaxValue);
public void Enqueue(T item)
{
lock (_lock)
{
_queue.Enqueue(item);
}
}
public T Dequeue()
{
_pool.WaitOne();
lock(_lock)
{
return _queue.Dequeue();
}
}
}
實現一個佇列,那麼希望是執行緒安全的,所以要給其進出加上lock。
同時希望,如果佇列中為空的時候能夠進行等待,不至於一直去輪詢。
這裡使用的是Semaphore,執行緒號誌,這個在後面會介紹到。
然後就到了實現執行緒池排程的時候:
public class TaskThreadPool : TaskScheduler, IDisposable
{
private readonly BlockingQueue<Task> _queue = new BlockingQueue<Task>();
private Thread[] _threads;
private bool _disposed;
private readonly object _lock = new object();
public int ThreadCount { get; }
public TaskThreadPool(int threadCount, bool isBackground = false)
{
if (threadCount < 1)
{
throw new ArgumentOutOfRangeException(nameof(threadCount), "Must be at least 1");
}
ThreadCount = threadCount;
_threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
_threads[i] = new Thread(ExcuteTasks)
{
IsBackground = isBackground
};
_threads[i].Start();
}
}
public Task Run(Action action) =>
Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.None, this);
private void ExcuteTasks()
{
while (true)
{
var task = _queue.Dequeue();
if (task == null)
{
return;
}
TryExecuteTask(task);
}
}
protected override IEnumerable<Task>? GetScheduledTasks()
{
return _queue.ToArray();
}
protected override void QueueTask(Task task)
{
_queue.Enqueue(task);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (_disposed)
{
throw new ObjectDisposedException(typeof(TaskThreadPool).FullName);
}
return !taskWasPreviouslyQueued && TryExecuteTask(task);
}
public void Dispose()
{
lock (_lock)
{
if (_disposed)
{
return;
}
_disposed = true;
}
for (int i = 0; i < _threads.Length; i++)
_queue.Enqueue(null);
foreach (var thread in _threads)
thread.Join();
_threads = null;
_queue.Dispose();
}
}
程式碼也很簡單常規,就是初始化多少個執行緒作為執行緒池,然後Task排隊執行就行了,記得要釋放資源。
這裡dispose讓其他執行緒進行停止的訊號為:_queue.Enqueue(null).
private void ExcuteTasks()
{
while (true)
{
var task = _queue.Dequeue();
if (task == null)
{
return;
}
TryExecuteTask(task);
}
}
其他執行緒消費到null,那麼就應該停止了。
然後有一個run方法,可以直接讓action,放進來執行:
public Task Run(Action action) =>
Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.None, this);
總結一下基本思路:
需要實現TaskScheduler,這樣可以避免自己寫一些任務執行的邏輯控制
因為使用了號誌,所以BlockingQueue,然後 TaskThreadPool 需要使用到BlockingQueue,所以需要加上IDispose
需要控制執行緒數,並在物件銷燬的時候禁止新的task進入,執行完已經加入佇列的任務
需要有一個run方法,這樣對外提供方便
考慮到號誌的釋放,那麼也完善了blockingqueue:
public sealed class BlockingQueue<T> : IDisposable
{
private readonly Queue<T> _queue = new Queue<T>();
private readonly object _lock = new object();
private readonly Semaphore _pool= new Semaphore(0, int.MaxValue);
public void Enqueue(T item)
{
lock (_lock)
{
_queue.Enqueue(item);
}
}
public T Dequeue()
{
_pool.WaitOne();
lock(_lock)
{
return _queue.Dequeue();
}
}
public IEnumerable<T> ToArray()
{
return _queue.ToArray();
}
public void Dispose()
{
_pool.Dispose();
}
}
簡單寫了一下自定義的執行緒池,上文中介紹到了Semaphore 這個號誌,下一節為Semaphore 的介紹和實現原理。