c# 非同步進階———— 自定義 taskschedule[三]

2023-04-24 12:01:54

前言

我們知道我們的task async 和 await 是基於執行緒池進行排程的。

但是async 和 await 也就是使用了預設的task排程,讓其線上程池中執行。

但是執行緒池是榨乾機器效能為本質的,但是有時候我們執行一些我們自己的需求,比如控制一下執行緒數(因為並不是執行緒數越高,就能有更高的效能),控制一下cpu使用,避免cpu使用太高。

正文

首先我們需要一個佇列,因為我們需要讓task進行儲存到某個地方,這裡選擇佇列,因為它簡單,也一般符合我們先進先出(先到先執行)的想法。

public sealed class BlockingQueue<T>
{
	private readonly Queue<T> _queue = new Queue<T>();

	private readonly object _lock = new object();

	private readonly Semaphore _pool= new Semaphore(0, int.MaxValue);

	public void Enqueue(T item)
	{
		lock (_lock)
		{
			_queue.Enqueue(item);
		}
	}

	public T Dequeue()
	{
		_pool.WaitOne();
		lock(_lock)
		{
			return _queue.Dequeue();
		}
	}
}

實現一個佇列,那麼希望是執行緒安全的,所以要給其進出加上lock。

同時希望,如果佇列中為空的時候能夠進行等待,不至於一直去輪詢。

這裡使用的是Semaphore,執行緒號誌,這個在後面會介紹到。

然後就到了實現執行緒池排程的時候:

public class TaskThreadPool : TaskScheduler, IDisposable
{
	private readonly BlockingQueue<Task> _queue = new BlockingQueue<Task>();

	private Thread[] _threads;
	private bool _disposed;
	private readonly object _lock = new object();

	public int ThreadCount { get; }

	public TaskThreadPool(int threadCount, bool isBackground = false)
	{
		if (threadCount < 1)
		{
			throw new ArgumentOutOfRangeException(nameof(threadCount), "Must be at least 1");
		}

		ThreadCount = threadCount;
		_threads = new Thread[threadCount];
		for (int i = 0; i < threadCount; i++)
		{
			_threads[i] = new Thread(ExcuteTasks)
			{
				IsBackground = isBackground 
			};
			_threads[i].Start();
		}
	}

	public Task Run(Action action) =>
		Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.None, this);

	private void ExcuteTasks()
	{
		while (true)
		{
			var task = _queue.Dequeue();
			if (task == null)
			{
				return;
			}

			TryExecuteTask(task);
		}
	}

	protected override IEnumerable<Task>? GetScheduledTasks()
	{
		return _queue.ToArray();
	}

	protected override void QueueTask(Task task)
	{
		_queue.Enqueue(task);
	}

	protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
	{
		if (_disposed)
		{
			throw new ObjectDisposedException(typeof(TaskThreadPool).FullName);
		}

		return !taskWasPreviouslyQueued && TryExecuteTask(task);
	}

	public void Dispose()
	{
		lock (_lock)
		{
			if (_disposed)
			{
				return;
			}

			_disposed = true;
		}
		
		for (int i = 0; i < _threads.Length; i++)
			_queue.Enqueue(null);

		foreach (var thread in _threads)
			thread.Join();

		_threads = null;
		_queue.Dispose();
	}
}

程式碼也很簡單常規,就是初始化多少個執行緒作為執行緒池,然後Task排隊執行就行了,記得要釋放資源。

這裡dispose讓其他執行緒進行停止的訊號為:_queue.Enqueue(null).

private void ExcuteTasks()
{
	while (true)
	{
		var task = _queue.Dequeue();
		if (task == null)
		{
			return;
		}

		TryExecuteTask(task);
	}
}

其他執行緒消費到null,那麼就應該停止了。

然後有一個run方法,可以直接讓action,放進來執行:

public Task Run(Action action) =>
		Task.Factory.StartNew(action, CancellationToken.None, TaskCreationOptions.None, this);

總結一下基本思路:

  1. 需要實現TaskScheduler,這樣可以避免自己寫一些任務執行的邏輯控制

  2. 因為使用了號誌,所以BlockingQueue,然後 TaskThreadPool 需要使用到BlockingQueue,所以需要加上IDispose

  3. 需要控制執行緒數,並在物件銷燬的時候禁止新的task進入,執行完已經加入佇列的任務

  4. 需要有一個run方法,這樣對外提供方便

考慮到號誌的釋放,那麼也完善了blockingqueue:

public sealed class BlockingQueue<T> : IDisposable
{
	private readonly Queue<T> _queue = new Queue<T>();

	private readonly object _lock = new object();

	private readonly Semaphore _pool= new Semaphore(0, int.MaxValue);

	public void Enqueue(T item)
	{
		lock (_lock)
		{
			_queue.Enqueue(item);
		}
	}

	public T Dequeue()
	{
		_pool.WaitOne();
		lock(_lock)
		{
			return _queue.Dequeue();
		}
	}

	public IEnumerable<T> ToArray()
	{
		return _queue.ToArray();
	}

	public void Dispose()
	{
		_pool.Dispose();
	}
}

簡單寫了一下自定義的執行緒池,上文中介紹到了Semaphore 這個號誌,下一節為Semaphore 的介紹和實現原理。