經過仔細考慮其他答案,我決定爲我的用途創建自定義QueuedTaskScheduler
更容易,因爲我不需要擔心異步任務或IO完成(儘管這給了我一些想法)。
首先,當我們從孩子的工作池搶工作,我們增加了一個信號量鎖,裏面FindNextTask_NeedsLock
:
var items = queueForTargetTask._workItems;
if (items.Count > 0
&& queueForTargetTask.TryLock() /* This is added */)
{
targetTask = items.Dequeue();
對於專門的線程版本,裏面ThreadBasedDispatchLoop
:
// ... and if we found one, run it
if (targetTask != null)
{
queueForTargetTask.ExecuteTask(targetTask);
queueForTargetTask.Release();
}
對於任務調度程序版本,內部ProcessPrioritizedAndBatchedTasks
:
// Now if we finally have a task, run it. If the task
// was associated with one of the round-robin schedulers, we need to use it
// as a thunk to execute its task.
if (targetTask != null)
{
if (queueForTargetTask != null)
{
queueForTargetTask.ExecuteTask(targetTask);
queueForTargetTask.Release();
}
else
{
TryExecuteTask(targetTask);
}
}
當我們創建新的子隊列:
/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
/// <returns>The newly created and activated queue at priority 0 and max concurrency of 1.</returns>
public TaskScheduler ActivateNewQueue() { return ActivateNewQueue(0, 1); }
/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary>
/// <param name="priority">The priority level for the new queue.</param>
/// <returns>The newly created and activated queue at the specified priority.</returns>
public TaskScheduler ActivateNewQueue(int priority, int maxConcurrency)
{
// Create the queue
var createdQueue = new QueuedTaskSchedulerQueue(priority, maxConcurrency, this);
...
}
最後,嵌套QueuedTaskSchedulerQueue
內:
// This is added.
private readonly int _maxConcurrency;
private readonly Semaphore _semaphore;
internal bool TryLock()
{
return _semaphore.WaitOne(0);
}
internal void Release()
{
_semaphore.Release();
_pool.NotifyNewWorkItem();
}
/// <summary>Initializes the queue.</summary>
/// <param name="priority">The priority associated with this queue.</param>
/// <param name="maxConcurrency">Max concurrency for this scheduler.</param>
/// <param name="pool">The scheduler with which this queue is associated.</param>
internal QueuedTaskSchedulerQueue(int priority, int maxConcurrency, QueuedTaskScheduler pool)
{
_priority = priority;
_pool = pool;
_workItems = new Queue<Task>();
// This is added.
_maxConcurrency = maxConcurrency;
_semaphore = new Semaphore(_maxConcurrency, _maxConcurrency);
}
我希望有人試圖做同樣的我和交錯無序的任務,這可能是有用的在單個易於使用的調度程序(可以使用默認線程池或其他任何調度程序)上執行有序任務。
=== UPDATE ===
由Stephen Cleary的啓發下,我最終使用:
private static readonly Lazy<TaskScheduler> Scheduler = new Lazy<TaskScheduler>(
() => new WorkStealingTaskScheduler(16));
public static TaskScheduler Default
{
get
{
return Scheduler.Value;
}
}
public static TaskScheduler CreateNewOrderedTaskScheduler()
{
return new QueuedTaskScheduler(Default, 1);
}
多少次任務調度,你需要什麼?我明白你的任務有依賴性,這就是你爲什麼要(部分)訂購它們的原因。你可以用ContinueWith鏈來做到這一點。 – usr 2014-11-04 12:22:18
@usr我們沒有上限。可能相當多。我們有一個通用的命令調度程序層 - 某些命令類型不應該平行運行。例如。數據集A的Get和Set命令應該排隊,以便在並行運行時不會發生剪切讀取。在這些排隊並且串行執行的同時,數據集B的Get命令可以在父調度器上並行運行。編輯:理想情況下,我們不希望使用ContinueWith鏈,因爲調度層的通用(&火和遺忘)性質。 – jamespconnor 2014-11-04 13:08:07
你只需要跟蹤任何給定鏈中的最新任務。當一個新的任務進入時,您將設置下一個任務並存儲新的任務。你放棄舊的。替代解決方案:每個鏈有一個SemaphoreSlim,並使用'await sem.WaitAsync()'來非常靈活地手動控制DOP。我不認爲調度程序是正確的抽象。考慮使用普通任務組合器和協調原語。 – usr 2014-11-04 13:15:53