3

我要尋找一個TaskScheduler是:可以交錯任務有限公司併發的TaskScheduler被明令

  1. 讓我定義一些專用線程(例如8) - 一個標準的LimitedConcurrencyLevelTaskScheduler(使用線程池線程)或WorkStealingTaskScheduler這樣做。
  2. 允許我創建完全排序的子任務調度程序,但會調度父調度程序的專用線程上的任務。

目前我們使用TaskScheduler.Default作爲普通池(在線程池生長算法等的擺佈下)和new OrderedTaskScheduler(),無論何時我們要訂購任務。我想保持這種行爲,但將這兩個要求限制在我自己的專用線程池中。

QueuedTaskScheduler似乎變得非常接近。我認爲QueuedTaskScheduler.ActivateNewQueue()方法,它返回一個孩子TaskScheduler將執行任務從父母的工人池中的任務,但似乎並非如此。子TaskSchedulers似乎與父級具有相同的並行級別。

我不一定希望child taskscheduler任務優先於父任務調度程序任務(儘管它可能是未來的一個很好的功能)。

我在這裏看到一個相關的問題:Limited concurrency level task scheduler (with task priority) handling wrapped tasks但我的需求不需要處理異步任務(我所有入隊的任務都是從頭到尾完全同步,沒有延續)。

+0

多少次任務調度,你需要什麼?我明白你的任務有依賴性,這就是你爲什麼要(部分)訂購它們的原因。你可以用ContinueWith鏈來做到這一點。 – usr 2014-11-04 12:22:18

+0

@usr我們沒有上限。可能相當多。我們有一個通用的命令調度程序層 - 某些命令類型不應該平行運行。例如。數據集A的Get和Set命令應該排隊,以便在並行運行時不會發生剪切讀取。在這些排隊並且串行執行的同時,數據集B的Get命令可以在父調度器上並行運行。編輯:理想情況下,我們不希望使用ContinueWith鏈,因爲調度層的通用(&火和遺忘)性質。 – jamespconnor 2014-11-04 13:08:07

+3

你只需要跟蹤任何給定鏈中的最新任務。當一個新的任務進入時,您將設置下一個任務並存儲新的任務。你放棄舊的。替代解決方案:每個鏈有一個SemaphoreSlim,並使用'await sem.WaitAsync()'來非常靈活地手動控制DOP。我不認爲調度程序是正確的抽象。考慮使用普通任務組合器和協調原語。 – usr 2014-11-04 13:15:53

回答

5

我假設「完全有序」,你也意味着「一次一個」。

在這種情況下,我認爲有built-in solution that should do quite well: ConcurrentExclusiveSchedulerPair

你的「父母」調度會併發調度:

TaskScheduler _parent = new ConcurrentExclusiveSchedulerPair(TaskScheduler.Default, 8) 
    .ConcurrentScheduler; 

與「孩子」調度會使用併發調度下獨家調度:

var myScheduler = new ConcurrentExclusiveSchedulerPair(_parent).ExclusiveScheduler; 
+0

我剛看到這個。有趣的用法是,我放在'ExclusiveScheduler'上的有序工作是否會獲得對父調度器的完全鎖定,並阻止我將其推到'ConcurrentScheduler'上的無序工作? – jamespconnor 2014-11-04 18:42:08

+0

回答我自己的評論 - 啊不,我看到你已經爲每個孩子創造了一對全新的。有趣。謝謝你,我會檢查出來的。看起來好像比我自己的'QueuedTaskScheduler'更好的方法 – jamespconnor 2014-11-04 18:44:14

1

我知道你的任務有依賴關係,這就是爲什麼你想(部分)命令他們。你可以用ContinueWith鏈來做到這一點。你只需要跟蹤任何給定鏈中的最新任務。當一個新的任務進入時,您將設置下一個任務並存儲新的任務。你放棄舊的。

替代解決方案:具有每一個鏈和SemaphoreSlim使用await sem.WaitAsync()手動控制DOP非常靈活地。請注意,信號量異步等待而不是阻止任何線程。它只會導致少量的內存使用。根本沒有使用OS資源。您可以使用極其多的信號量。

我不認爲調度是正確的抽象。調度程序用於基於CPU的工作。其他協調工具可以與包括異步IO的任何Task一起使用。考慮更喜歡普通任務組合器和協調原語。

+0

感謝@usr將您的意見轉化爲正式答案。我接受這個答案,因爲最終我認爲這是我需要的正確方向。特別是,我可能會更傾向於使用IO完成端口進行異步Web調用,這些端口不會與我當前發佈的解決方案一起工作,作爲另一個答案。 – jamespconnor 2014-11-04 18:39:32

2

經過仔細考慮其他答案,我決定爲我的用途創建自定義QueuedTaskScheduler更容易,因爲我不需要擔心異步任務或IO完成(儘管這給了我一些想法)。

首先,當我們從孩子的工作池搶工作,我們增加了一個信號量鎖,裏面FindNextTask_NeedsLock

var items = queueForTargetTask._workItems; 
if (items.Count > 0 
    && queueForTargetTask.TryLock() /* This is added */) 
{ 
    targetTask = items.Dequeue(); 

對於專門的線程版本,裏面ThreadBasedDispatchLoop

// ... and if we found one, run it 
if (targetTask != null) 
{ 
    queueForTargetTask.ExecuteTask(targetTask); 
    queueForTargetTask.Release(); 
} 

對於任務調度程序版本,內部ProcessPrioritizedAndBatchedTasks

// Now if we finally have a task, run it. If the task 
// was associated with one of the round-robin schedulers, we need to use it 
// as a thunk to execute its task. 
if (targetTask != null) 
{ 
    if (queueForTargetTask != null) 
    { 
     queueForTargetTask.ExecuteTask(targetTask); 
     queueForTargetTask.Release(); 
    } 
    else 
    { 
     TryExecuteTask(targetTask); 
    } 
} 

當我們創建新的子隊列:

/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary> 
/// <returns>The newly created and activated queue at priority 0 and max concurrency of 1.</returns> 
public TaskScheduler ActivateNewQueue() { return ActivateNewQueue(0, 1); } 

/// <summary>Creates and activates a new scheduling queue for this scheduler.</summary> 
/// <param name="priority">The priority level for the new queue.</param> 
/// <returns>The newly created and activated queue at the specified priority.</returns> 
public TaskScheduler ActivateNewQueue(int priority, int maxConcurrency) 
{ 
    // Create the queue 
    var createdQueue = new QueuedTaskSchedulerQueue(priority, maxConcurrency, this); 

    ... 
} 

最後,嵌套QueuedTaskSchedulerQueue內:

// This is added. 
private readonly int _maxConcurrency; 
private readonly Semaphore _semaphore; 

internal bool TryLock() 
{ 
    return _semaphore.WaitOne(0); 
} 

internal void Release() 
{ 
    _semaphore.Release(); 
    _pool.NotifyNewWorkItem(); 
} 

/// <summary>Initializes the queue.</summary> 
/// <param name="priority">The priority associated with this queue.</param> 
/// <param name="maxConcurrency">Max concurrency for this scheduler.</param> 
/// <param name="pool">The scheduler with which this queue is associated.</param> 
internal QueuedTaskSchedulerQueue(int priority, int maxConcurrency, QueuedTaskScheduler pool) 
{ 
    _priority = priority; 
    _pool = pool; 
    _workItems = new Queue<Task>(); 

    // This is added. 
    _maxConcurrency = maxConcurrency; 
    _semaphore = new Semaphore(_maxConcurrency, _maxConcurrency); 
} 

我希望有人試圖做同樣的我和交錯無序的任務,這可能是有用的在單個易於使用的調度程序(可以使用默認線程池或其他任何調度程序)上執行有序任務。

=== UPDATE ===

由Stephen Cleary的啓發下,我最終使用:

private static readonly Lazy<TaskScheduler> Scheduler = new Lazy<TaskScheduler>(
    () => new WorkStealingTaskScheduler(16)); 

public static TaskScheduler Default 
{ 
    get 
    { 
     return Scheduler.Value; 
    } 
} 

public static TaskScheduler CreateNewOrderedTaskScheduler() 
{ 
    return new QueuedTaskScheduler(Default, 1); 
}