c# 非同步進階———— paralel [二]

2023-04-22 21:01:10

前言

簡單整理一下paralel,以上是並行的意思。

正文

我們在工作中常常使用task await 和 async,也就是將執行緒池進行了封裝,那麼還有一些更高階的應用。

是對task的封裝,那麼來看下paralel。

static void Main(string[] args)
{
	var ints= Enumerable.Range(1, 100);
	var result = Parallel.ForEach(ints, arg =>
	{
		Console.WriteLine(arg);
	});
	
	Console.Read();
}

可以看到結果是並行的。

那麼來看下實現機制。

public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
{
	if (source == null)
	{
		throw new ArgumentNullException(nameof(source));
	}
	if (body == null)
	{
		throw new ArgumentNullException(nameof(body));
	}

	return ForEachWorker<TSource, object>(
		source, s_defaultParallelOptions, body, null, null, null, null, null, null);
}

進行引數檢驗,然後交給了ForEachWorker。

這是一個基本的程式碼思路,就是複雜的方法中可以先校驗引數,然後具體實現交給另外一個方法。

然後通過不同的型別,進行分類:

然後看下具體實現是什麼?

進去看就是一個taskreplicator:

看下run在做什麼。

public static void Run<TState>(ReplicatableUserAction<TState> action, ParallelOptions options, bool stopOnFirstFailure)
{
	int maxConcurrencyLevel = (options.EffectiveMaxConcurrencyLevel > 0) ? options.EffectiveMaxConcurrencyLevel : int.MaxValue;

	TaskReplicator replicator = new TaskReplicator(options, stopOnFirstFailure);
	new Replica<TState>(replicator, maxConcurrencyLevel, CooperativeMultitaskingTaskTimeout_RootTask, action).Start();

	Replica nextReplica;
	while (replicator._pendingReplicas.TryDequeue(out nextReplica))
		nextReplica.Wait();

	if (replicator._exceptions != null)
		throw new AggregateException(replicator._exceptions);
}
  1. 建立了一個taskreplictor,起到管理作用

  2. 然後建立了一個Replica,然後這個start 是關鍵

  3. 然後通過while,讓每一個Replica 都執行完畢才推出,達到同步的效果

if (replicator._exceptions != null)
	throw new AggregateException(replicator._exceptions);

可以看一下這個,這個是一個比較好的技巧。如果一個執行管理,不用丟擲異常,之間在管理中進行執行處理總結。

比如結果,異常等。

那麼就看下這個start。

protected Replica(TaskReplicator replicator, int maxConcurrency, int timeout)
{
	_replicator = replicator;
	_timeout = timeout;
	_remainingConcurrency = maxConcurrency - 1;
	_pendingTask = new Task(s => ((Replica)s).Execute(), this);
	_replicator._pendingReplicas.Enqueue(this);
}

public void Start()
{
	_pendingTask.RunSynchronously(_replicator._scheduler);
}

將會執行Execute,是同步的,而不是非同步的,也就是說第一個task將會執行在當前執行緒。

那麼看Execute在做什麼?

public void Execute()
{
	try
	{
		if (!_replicator._stopReplicating && _remainingConcurrency > 0)
		{
			CreateNewReplica();
			_remainingConcurrency = 0; // new replica is responsible for adding concurrency from now on.
		}

		bool userActionYieldedBeforeCompletion;

		ExecuteAction(out userActionYieldedBeforeCompletion);

		if (userActionYieldedBeforeCompletion)
		{
			_pendingTask = new Task(s => ((Replica)s).Execute(), this, CancellationToken.None, TaskCreationOptions.None);
			_pendingTask.Start(_replicator._scheduler);
		}
		else
		{
			_replicator._stopReplicating = true;
			_pendingTask = null;
		}
	}
	catch (Exception ex)
	{
		LazyInitializer.EnsureInitialized(ref _replicator._exceptions).Enqueue(ex);
		if (_replicator._stopOnFirstFailure)
			_replicator._stopReplicating = true;
		_pendingTask = null;
	}
}

一段一段分析:

if (!_replicator._stopReplicating && _remainingConcurrency > 0)
{
	CreateNewReplica();
	_remainingConcurrency = 0; // new replica is responsible for adding concurrency from now on.
}

這裡當_replicator 也就是任務複製器沒有停止的時候。這裡有兩種情況會停止,一種是任務完成,一種是任務異常且設定引數異常時候停止。

_remainingConcurrency 指的是副本數,預設是int.max。

那麼就複製一個副本。

protected override void CreateNewReplica()
{
	Replica<TState> newReplica = new Replica<TState>(_replicator, _remainingConcurrency, GenerateCooperativeMultitaskingTaskTimeout(), _action);
	newReplica._pendingTask.Start(_replicator._scheduler);
}

複製完副本後,那麼就開始執行我們的action了。

protected override void ExecuteAction(out bool yieldedBeforeCompletion)
{
	_action(ref _state, _timeout, out yieldedBeforeCompletion);
}

這裡傳入了timeout,這個timeout並不是我們限制我們單個task的執行時間,而是當執行到一定時候後,這個task就停止執行,然後另外啟動一個副本。

if (CheckTimeoutReached(loopTimeout))
{
	replicationDelegateYieldedBeforeCompletion = true;
	break;
}
if (userActionYieldedBeforeCompletion)
{
	_pendingTask = new Task(s => ((Replica)s).Execute(), this, CancellationToken.None, TaskCreationOptions.None);
	_pendingTask.Start(_replicator._scheduler);
}
else
{
	_replicator._stopReplicating = true;
	_pendingTask = null;
}

這個是為了符合作業系統的排程思想,跑的越久的,基本上優先順序會低些。

那麼看下這個_action主要在做什麼吧。

while (myPartition.MoveNext())
{
	KeyValuePair<long, TSource> kvp = myPartition.Current;
	long index = kvp.Key;
	TSource value = kvp.Value;

	// Update our iteration index
	if (state != null) state.CurrentIteration = index;

	if (simpleBody != null)
		simpleBody(value);
	else if (bodyWithState != null)
		bodyWithState(value, state);
	else if (bodyWithStateAndIndex != null)
		bodyWithStateAndIndex(value, state, index);
	else if (bodyWithStateAndLocal != null)
		localValue = bodyWithStateAndLocal(value, state, localValue);
	else
		localValue = bodyWithEverything(value, state, index, localValue);

	if (sharedPStateFlags.ShouldExitLoop(index)) break;

	// Cooperative multitasking:
	// Check if allowed loop time is exceeded, if so save current state and return.
	// The task replicator will queue up a replacement task. Note that we don't do this on the root task.
	if (CheckTimeoutReached(loopTimeout))
	{
		replicationDelegateYieldedBeforeCompletion = true;
		break;
	}
}

就是拉取我們的enumerator的資料,然後simpleBody(value),進行執行我們寫的action。

總結一下,其實Parallel 核心就是一個任務複製器,然後建立多個副本,拉取我們的資料,進行執行我們設定的action。

裡面的主要功能,Parallel做到了限制副本數,因為我們知道task並不是越多越好。

第二個,如果長時間執行,那麼Parallel是做了優化的,當達到timeout的時候,那麼會重新啟動一個副本(可以理解為一個執行緒)

第三點,Parallel 有一個foreach 進行迭代器的處理,這裡不僅僅是讓任務可以並行。

而且具備c# foreach的基本功能。

static void Main(string[] args)
{
	var ints= Enumerable.Range(1, 100);
	var result = Parallel.ForEach(ints,    (arg, state)
		=>
	{
		if (state.IsStopped)
		{
			return;   
		}
		
		if (arg > 18)
		{
			state.Break();
		}
	});
	if (result.IsCompleted)
	{
		Console.WriteLine("完成");
	}
	Console.Read();
}

可以進行中斷。

還有一個函數,那就是stop,這個stop 比break 停止的快,break 要記錄出,最小中斷位置。

而stop 就是立馬停止下來。

在上述中,我們知道可以傳遞一個taskschedule進行,那麼這個taskschedule 是幹什麼的,對我們的任務排程有什麼影響呢? 下一節,自我實現taskschedule。