2012-11-14 50 views
5

我很難找到一個任務調度程序,我可以在該任務調度程序上安排優先任務,但也可以處理「包裝」任務。它類似於Task.Run試圖解決的問題,但不能將任務調度程序指定爲Task.Run。 我一直在使用Parallel Extensions Extras Samples中的QueuedTaskScheduler來解決任務優先級要求(也由此post建議)。處理包裝任務的有限併發級別任務調度程序(帶任務優先級)

這是我的例子:

class Program 
{ 
    private static QueuedTaskScheduler queueScheduler = new QueuedTaskScheduler(targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1); 
    private static TaskScheduler ts_priority1; 
    private static TaskScheduler ts_priority2; 
    static void Main(string[] args) 
    { 
     ts_priority1 = queueScheduler.ActivateNewQueue(1); 
     ts_priority2 = queueScheduler.ActivateNewQueue(2); 

     QueueValue(1, ts_priority2); 
     QueueValue(2, ts_priority2); 
     QueueValue(3, ts_priority2); 
     QueueValue(4, ts_priority1); 
     QueueValue(5, ts_priority1); 
     QueueValue(6, ts_priority1); 

     Console.ReadLine();   
    } 

    private static Task QueueTask(Func<Task> f, TaskScheduler ts) 
    { 
     return Task.Factory.StartNew(f, CancellationToken.None, TaskCreationOptions.HideScheduler | TaskCreationOptions.DenyChildAttach, ts); 
    } 

    private static Task QueueValue(int i, TaskScheduler ts) 
    { 
     return QueueTask(async() => 
     { 
      Console.WriteLine("Start {0}", i); 
      await Task.Delay(1000); 
      Console.WriteLine("End {0}", i); 
     }, ts); 
    } 
} 

上面的例子的典型輸出是:

Start 4 
Start 5 
Start 6 
Start 1 
Start 2 
Start 3 
End 4 
End 3 
End 5 
End 2 
End 1 
End 6 

我要的是:

Start 4 
End 4 
Start 5 
End 5 
Start 6 
End 6 
Start 1 
End 1 
Start 2 
End 2 
Start 3 
End 3 

編輯:

我想我正在尋找一個類似於QueuedTaskScheduler的任務調度程序,它可以解決這個問題。但歡迎任何其他建議。

+0

那麼,你想要的是處理任務的優先級,但不能在並行模式下運行它們?你能不能限制調度程序中併發線程的數量? – Kek

+0

@Kek'new QueuedTaskScheduler(targetScheduler:TaskScheduler.Default,maxConcurrencyLevel:1);'上面完全是這樣(限制併發線程的數量爲1) –

回答

2

我能找到的最好的解決辦法是讓我自己QueuedTaskScheduler(在Parallel Extensions Extras Samples源代碼中發現原)的版本。

我在QueuedTaskScheduler的構造函數中添加了一個bool awaitWrappedTasks參數。

public QueuedTaskScheduler(
     TaskScheduler targetScheduler, 
     int maxConcurrencyLevel, 
     bool awaitWrappedTasks = false) 
{ 
    ... 
    _awaitWrappedTasks = awaitWrappedTasks; 
    ... 
} 

public QueuedTaskScheduler(
     int threadCount, 
     string threadName = "", 
     bool useForegroundThreads = false, 
     ThreadPriority threadPriority = ThreadPriority.Normal, 
     ApartmentState threadApartmentState = ApartmentState.MTA, 
     int threadMaxStackSize = 0, 
     Action threadInit = null, 
     Action threadFinally = null, 
     bool awaitWrappedTasks = false) 
{ 
    ... 
    _awaitWrappedTasks = awaitWrappedTasks; 

    // code starting threads (removed here in example) 
    ... 
} 

我然後修改ProcessPrioritizedAndBatchedTasks()方法爲async

private async void ProcessPrioritizedAndBatchedTasks() 

我然後只是其中執行預定任務的部件之後修改的代碼:

private async void ProcessPrioritizedAndBatchedTasks() 
{ 
    bool continueProcessing = true; 
    while (!_disposeCancellation.IsCancellationRequested && continueProcessing) 
    { 
     try 
     { 
      // Note that we're processing tasks on this thread 
      _taskProcessingThread.Value = true; 

      // Until there are no more tasks to process 
      while (!_disposeCancellation.IsCancellationRequested) 
      { 
       // Try to get the next task. If there aren't any more, we're done. 
       Task targetTask; 
       lock (_nonthreadsafeTaskQueue) 
       { 
        if (_nonthreadsafeTaskQueue.Count == 0) break; 
        targetTask = _nonthreadsafeTaskQueue.Dequeue(); 
       } 

       // If the task is null, it's a placeholder for a task in the round-robin queues. 
       // Find the next one that should be processed. 
       QueuedTaskSchedulerQueue queueForTargetTask = null; 
       if (targetTask == null) 
       { 
        lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); 
       } 

       // Now if we finally have a task, run it. If the task 
       // was associated with one of the round-robin schedulers, we need to use it 
       // as a thunk to execute its task. 
       if (targetTask != null) 
       { 
        if (queueForTargetTask != null) queueForTargetTask.ExecuteTask(targetTask); 
        else TryExecuteTask(targetTask); 

        // ***** MODIFIED CODE START **** 
        if (_awaitWrappedTasks) 
        { 
         var targetTaskType = targetTask.GetType(); 
         if (targetTaskType.IsConstructedGenericType && typeof(Task).IsAssignableFrom(targetTaskType.GetGenericArguments()[0])) 
         { 
          dynamic targetTaskDynamic = targetTask; 
          // Here we await the completion of the proxy task. 
          // We do not await the proxy task directly, because that would result in that await will throw the exception of the wrapped task (if one existed) 
          // In the continuation we then simply return the value of the exception object so that the exception (stored in the proxy task) does not go totally unobserved (that could cause the process to crash) 
          await TaskExtensions.Unwrap(targetTaskDynamic).ContinueWith((Func<Task, Exception>)(t => t.Exception), TaskContinuationOptions.ExecuteSynchronously); 
         } 
        } 
        // ***** MODIFIED CODE END **** 
       } 
      } 
     } 
     finally 
     { 
      // Now that we think we're done, verify that there really is 
      // no more work to do. If there's not, highlight 
      // that we're now less parallel than we were a moment ago. 
      lock (_nonthreadsafeTaskQueue) 
      { 
       if (_nonthreadsafeTaskQueue.Count == 0) 
       { 
        _delegatesQueuedOrRunning--; 
        continueProcessing = false; 
        _taskProcessingThread.Value = false; 
       } 
      } 
     } 
    } 
} 

方法的變化ThreadBasedDispatchLoop有點不同,因爲我們不能使用async關鍵字,否則我們會打破ex的功能在專用線程中處理預定任務。所以這裏的ThreadBasedDispatchLoop

private void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally) 
{ 
    _taskProcessingThread.Value = true; 
    if (threadInit != null) threadInit(); 
    try 
    { 
     // If the scheduler is disposed, the cancellation token will be set and 
     // we'll receive an OperationCanceledException. That OCE should not crash the process. 
     try 
     { 
      // If a thread abort occurs, we'll try to reset it and continue running. 
      while (true) 
      { 
       try 
       { 
        // For each task queued to the scheduler, try to execute it. 
        foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)) 
        { 
         Task targetTask = task; 
         // If the task is not null, that means it was queued to this scheduler directly. 
         // Run it. 
         if (targetTask != null) 
         { 
          TryExecuteTask(targetTask); 
         } 
         // If the task is null, that means it's just a placeholder for a task 
         // queued to one of the subschedulers. Find the next task based on 
         // priority and fairness and run it. 
         else 
         { 
          // Find the next task based on our ordering rules...          
          QueuedTaskSchedulerQueue queueForTargetTask; 
          lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); 

          // ... and if we found one, run it 
          if (targetTask != null) queueForTargetTask.ExecuteTask(targetTask); 
         } 

         if (_awaitWrappedTasks) 
         { 
          var targetTaskType = targetTask.GetType(); 
          if (targetTaskType.IsConstructedGenericType && typeof(Task).IsAssignableFrom(targetTaskType.GetGenericArguments()[0])) 
          { 
           dynamic targetTaskDynamic = targetTask; 
           // Here we wait for the completion of the proxy task. 
           // We do not wait for the proxy task directly, because that would result in that Wait() will throw the exception of the wrapped task (if one existed) 
           // In the continuation we then simply return the value of the exception object so that the exception (stored in the proxy task) does not go totally unobserved (that could cause the process to crash) 
           TaskExtensions.Unwrap(targetTaskDynamic).ContinueWith((Func<Task, Exception>)(t => t.Exception), TaskContinuationOptions.ExecuteSynchronously).Wait(); 
          } 
         } 
        } 
       } 
       catch (ThreadAbortException) 
       { 
        // If we received a thread abort, and that thread abort was due to shutting down 
        // or unloading, let it pass through. Otherwise, reset the abort so we can 
        // continue processing work items. 
        if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload()) 
        { 
         Thread.ResetAbort(); 
        } 
       } 
      } 
     } 
     catch (OperationCanceledException) { } 
    } 
    finally 
    { 
     // Run a cleanup routine if there was one 
     if (threadFinally != null) threadFinally(); 
     _taskProcessingThread.Value = false; 
    } 
} 

我已經測試這個修改後的版本,它提供了所需的輸出。這種技術也可以用於其他任何調度器。例如。 LimitedConcurrencyLevelTaskSchedulerOrderedTaskScheduler

+0

等待調度程序中的任務將破壞異步IO的值。如果你不需要異步IO,你可以切換到同步任務主體。 – usr

+0

+1。我在這個問題上學到了很多東西。不完全相信這種解決方案比「AsyncSemaphore」更可取,但我會考慮這一點。 – usr

+0

您正在執行'TaskScheduler'實現中的'async-void'方法嗎?可怕的是,我不知道@StephenCleary帽子對此沒有什麼可說的。 – springy76

0

我認爲實現這個目標是不可能的。一個核心問題似乎是TaskScheduler只能用於運行代碼。但是有些任務不運行代碼,例如IO任務或計時器任務。我不認爲TaskScheduler基礎設施可以用來安排這些。

從的TaskScheduler的角度看,它看起來像這樣:

1. Select a registered task for execution 
2. Execute its code on the CPU 
3. Repeat 

步驟(2)是同步的,這意味着Task要執行必須開始和結束作爲步驟的部分(2)。這意味着這個Task不能做異步IO,因爲那是非阻塞的。從這個意義上說,TaskScheduler只支持阻塞碼。

我想你會得到最好的實施你自己的版本AsyncSemaphore釋放服務員在優先順序,並進行限制。您的異步方法可以以非阻塞方式等待該信號量。所有CPU工作都可以在默認線程池上運行,因此不需要在自定義TaskScheduler內部啓動自定義線程。 IO任務可以繼續使用非阻塞IO。

+0

你在這裏解釋了我已經試過了,它基本上有相同的輸出如原來的問題)。在你的建議中'firstPartTask'在排隊的任務調度器上被調度,但是一旦它碰到第一個'await'並且調度器簡單地執行隊列中的下一個「第一部分」,即使前面的「內部任務」(在第一次「等待」之後重置任務)尚未完成。我只能認爲這將通過一個**調度程序**來解決,這個調度程序處理我正在尋找的這個場景,並且無法通過調度程序外的某些魔法來解決。 –

+0

我來相信你是對的。我添加了一些想法和建議。請讓我知道你在想什麼。 – usr

+0

感謝您的更新。您使用信號量鎖的建議正是用戶在以下[答案](http://stackoverflow.com/a/13379980/1514235)中建議的內容(請參閱我的意見)。您建議調度程序僅同步執行其任務的方式有些不正確,但如果調度程序在執行任何其他任務之前等待每個任務的「包裝」任務,該怎麼辦?我認爲這給了我一個想法...謝謝(如果我想出點什麼,會讓你知道)。 –

3

不幸的是,這不能用TaskScheduler解決,因爲他們總是在Task水平工作,以及async方法幾乎總是包含多個Task秒。

您應該將SemaphoreSlim與優先級調度程序結合使用。或者,您可以使用AsyncLock(這也包含在我的AsyncEx library中)。

class Program 
{ 
    private static QueuedTaskScheduler queueScheduler = new QueuedTaskScheduler(targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1); 
    private static TaskScheduler ts_priority1; 
    private static TaskScheduler ts_priority2; 
    private static SemaphoreSlim semaphore = new SemaphoreSlim(1); 
    static void Main(string[] args) 
    { 
    ts_priority1 = queueScheduler.ActivateNewQueue(1); 
    ts_priority2 = queueScheduler.ActivateNewQueue(2); 

    QueueValue(1, ts_priority2); 
    QueueValue(2, ts_priority2); 
    QueueValue(3, ts_priority2); 
    QueueValue(4, ts_priority1); 
    QueueValue(5, ts_priority1); 
    QueueValue(6, ts_priority1); 

    Console.ReadLine();   
    } 

    private static Task QueueTask(Func<Task> f, TaskScheduler ts) 
    { 
    return Task.Factory.StartNew(f, CancellationToken.None, TaskCreationOptions.HideScheduler | TaskCreationOptions.DenyChildAttach, ts).Unwrap(); 
    } 

    private static Task QueueValue(int i, TaskScheduler ts) 
    { 
    return QueueTask(async() => 
    { 
     await semaphore.WaitAsync(); 
     try 
     { 
     Console.WriteLine("Start {0}", i); 
     await Task.Delay(1000); 
     Console.WriteLine("End {0}", i); 
     } 
     finally 
     { 
     semaphore.Release(); 
     } 
    }, ts); 
    } 
} 
+1

這看起來像一個有趣的解決方案。但是,我看到了這個問題。儘管解決方案(首先)會導致正確的輸出(如在這個問題中),但它會破壞已執行任務的優先級。調度程序將執行所有任務(以正確的優先級),直到'await semaphore.WaitAsync()',但具有較高優先級的任務將不會從優先級較低的任務之前的鎖中釋放。如果在優先級較低的任務(仍在等待從鎖釋放)之後安排更高優先級的任務,則尤其如此。 –

+0

在這種情況下,您將需要一個實際的基於優先級的鎖定,因爲AFAIK沒有其他人需要鎖定,所以這個鎖定不存在。你必須建立自己的。 –

+0

我已經添加了我自己的[答案](http://stackoverflow.com/a/13414364/1514235)。請看看你的想法。 –

相關問題