2015-02-08 70 views
3

我試圖創建一個TaskScheduler,它按順序運行所有任務,但只會「完成」最近計劃的任務。例如,如果我使用它來計劃任務A,那麼在它完成計劃任務B和C之前,我只希望C被認爲是成功的。 A可以繼續工作,但在完成時應被視爲「取消」,B應在開始前被標記爲取消。取消TaskScheduler中的TPL任務

我已經有了在線程池上按順序執行委託的現有代碼,並且管理最多有2個排隊任務的想法 - 一個當前正在執行,另一個是下一個。缺少的部分是能夠將任務的結果狀態設置爲取消

不幸的是,從TaskScheduler內部看來,您實際上幾乎不能訪問Task或任何CancellationToken的狀態。

我試圖通過跟蹤最後排隊的任務來解決這個問題,並且在執行任務時拋出TaskCancelledException,如果它不等於最後排隊的任務,但那不會似乎工作。我想這是因爲這個異常不會被拋入任務的委託中,而所有'魔術'實際上都是在TryExecuteTask()內部處理的。

下面是我有:

public class CurrentPendingTaskScheduler : TaskScheduler 
     { 
     private readonly ThreadSafeCurrentPendingQueueProcessor<Task> _Processor; 
     private Task _LastTask; 

     public CurrentPendingTaskScheduler() 
      { 
      _Processor = new ThreadSafeCurrentPendingQueueProcessor<Task>(); 
      _Processor.Process += _Processor_Process; 
      } 

     private void _Processor_Process(Task obj) 
      { 
      // If there's a newer task already, cancel this one before starting 
      if (obj != _LastTask) 
       throw new TaskCanceledException(obj); 

      TryExecuteTask(obj); 

      // If a newer task was added whilst we worked, cancel this one 
      if (obj != _LastTask) 
       throw new TaskCanceledException(obj); 
      } 

     protected override void QueueTask(Task task) 
      { 
      _LastTask = task; 
      _Processor.Enqueue(task); 
      } 

     protected override Boolean TryExecuteTaskInline(Task task, Boolean taskWasPreviouslyQueued) 
      { 
      return false; 
      } 

     protected override IEnumerable<Task> GetScheduledTasks() 
      { 
      throw new NotImplementedException(); 
      } 
     } 

ThreadSafeCurrentPendingQueueProcessor<>類是通過事件回調,以處理在單個後臺線程排隊的項目,只允許一個活動項目,一個懸而未決的一個幫手項目。

如果'最後的任務'在處理器回調之前發生了變化,則異常只會阻止任務運行(但不會影響其狀態)。如果回調確實運行,但在此期間「最後的任務」已經改變,那麼在任何延續已經開始之後,異常就會被拋得太晚。

另外我不確定是否有這個原因,但是第一次使用調度程序(我爲每個UI元素單擊安排一個任務),QueueTask被調用一次,並帶有新任務。然而,對於後續的每個調度,它都被調用兩次。這會讓事情變得更糟,因爲_LastTask被覆蓋。

我覺得TaskCompletionSource<>可能有一定的用處,但看不出如何。

是否可以實現按照描述工作的TaskScheduler?我知道我可以在調度程序之外實現這種行爲,也就是在創建任務的時候,但我需要在很多地方使用它,並且試圖通過將它放入可重用的調度程序中來使生活更輕鬆。

+2

我不認爲你可以做到這一點。 「TaskScheduler」的工作是決定什麼時候,什麼時候執行一個Task,但不知道結果會是什麼。 – svick 2015-05-06 21:36:56

回答

0

我會創建一個helper類,它接受一個輸入操作,啓動它,取消現有的操作,並覆蓋它的內部變量。由於您不能直接在Task<T>上執行Cancel(),因此您需要保留自己的TaskCancellationSource方便。如果您想提供外部令牌,則可以將它們與CancellationTokenSource.CreateLinkedTokenSource(...)結合使用。如果您需要密切關注結果,那麼這將成爲TaskCompletionSource的良機。

public class OverwriteTaskHandler<T> 
{ 
    private Task<T> _task; 
    private TaskCompletionSource<T> _tcs; 
    private CancellationTokenSource _cts; 

    public OverwriteTaskHandler(Func<T> operation) 
    { 
     _tcs = new TaskCompletionSource<T>(); 
     _cts = new CancellationTokenSource(); 
     TryPushTask(operation); 
    } 

    public bool TryPushTask(Func<T> operation) 
    { 
     if (_tcs.Task.IsCompleted) 
      return false; //It would be unsafe to use this instance as it is already "finished" 
     _cts.Cancel(); 
     _cts = new CancellationTokenSource(); 
     _task = Task.Run(operation, _cts.Token); 
     _task.ContinueWith(task => _tcs.SetResult(task.Result)); 
     return true; 
    } 

    public void Cancel() 
    { 
     _cts.Cancel(); 
    } 

    public Task<T> WrappedTask { get { return _tcs.Task; } } 
} 

Discalimer:我沒有測試過這一點,所以才仔細檢查!

0

AFAIK TaskScheduler不能用於完成這項工作。你真的需要別的東西。例如,你可以自己寫一個輔助類有以下用途:

static CurrentPendingTaskContext ctx = ...; 

async Task MyAsyncFunc() { 
await ctx.RegisterAndMaybeCancel(); 
try { 
    //rest of method 
} 
finally { 
    ctx.NotifyCompletion(); 
} 
} 

RegisterAndMaybeCancel會等到當前運行的任務就完成了。如果這個特定的任務已經被另一個任務取代,它會拋出取消異常。

我現在沒有時間來實現這個類(儘管它很誘人)。但我認爲這種語法模式非常簡單,您可以在很多地方使用它。

您還可以使用IDisposable模式這個擺脫了最後:

async Task MyAsyncFunc() { 
using (await ctx.RegisterAndMaybeCancel()) 
{ 
     //rest of method 
} 
}