2013-09-25 44 views
12

是否存在推薦建立的自我取消和重新啓動任務的模式?自我取消和重新啓動任務的模式

例如,我正在開發背景拼寫檢查程序的API。拼寫檢查會議包裝爲Task。每個新會話都應該取消前一個會話並等待終止(正確地重新使用拼寫檢查服務提供程序等資源)。

我想出這樣的事情:

class Spellchecker 
{ 
    Task pendingTask = null; // pending session 
    CancellationTokenSource cts = null; // CTS for pending session 

    // SpellcheckAsync is called by the client app 
    public async Task<bool> SpellcheckAsync(CancellationToken token) 
    { 
     // SpellcheckAsync can be re-entered 
     var previousCts = this.cts; 
     var newCts = CancellationTokenSource.CreateLinkedTokenSource(token); 
     this.cts = newCts; 

     if (IsPendingSession()) 
     { 
      // cancel the previous session and wait for its termination 
      if (!previousCts.IsCancellationRequested) 
       previousCts.Cancel(); 
      // this is not expected to throw 
      // as the task is wrapped with ContinueWith 
      await this.pendingTask; 
     } 

     newCts.Token.ThrowIfCancellationRequested(); 
     var newTask = SpellcheckAsyncHelper(newCts.Token); 

     this.pendingTask = newTask.ContinueWith((t) => { 
      this.pendingTask = null; 
      // we don't need to know the result here, just log the status 
      Debug.Print(((object)t.Exception ?? (object)t.Status).ToString()); 
     }, TaskContinuationOptions.ExecuteSynchronously); 

     return await newTask; 
    } 

    // the actual task logic 
    async Task<bool> SpellcheckAsyncHelper(CancellationToken token) 
    { 
     // do not start a new session if the the previous one still pending 
     if (IsPendingSession()) 
      throw new ApplicationException("Cancel the previous session first."); 

     // do the work (pretty much IO-bound) 
     try 
     { 
      bool doMore = true; 
      while (doMore) 
      { 
       token.ThrowIfCancellationRequested(); 
       await Task.Delay(500); // placeholder to call the provider 
      } 
      return doMore; 
     } 
     finally 
     { 
      // clean-up the resources 
     } 
    } 

    public bool IsPendingSession() 
    { 
     return this.pendingTask != null && 
      !this.pendingTask.IsCompleted && 
      !this.pendingTask.IsCanceled && 
      !this.pendingTask.IsFaulted; 
    } 
} 

的客戶端應用程序(用戶界面),就應該能夠調用SpellcheckAsync多次需要,而不必擔心取消掛起的會話。主要的doMore循環在UI線程上運行(因爲它涉及UI,而所有拼寫檢查服務提供者調用都是IO綁定的)。

我覺得有點不舒服,因爲我不得不將API分成兩個版本,分別是SpellcheckAsync和,但我想不出一個更好的方法來做到這一點,它還有待測試。

回答

15

我認爲一般的概念是相當不錯的,雖然我建議你不要使用ContinueWith

我只是把它寫使用常規await,和很多「我在已經運行」的邏輯是沒有必要的:

Task pendingTask = null; // pending session 
CancellationTokenSource cts = null; // CTS for pending session 

// SpellcheckAsync is called by the client app on the UI thread 
public async Task<bool> SpellcheckAsync(CancellationToken token) 
{ 
    // SpellcheckAsync can be re-entered 
    var previousCts = this.cts; 
    var newCts = CancellationTokenSource.CreateLinkedTokenSource(token); 
    this.cts = newCts; 

    if (previousCts != null) 
    { 
     // cancel the previous session and wait for its termination 
     previousCts.Cancel(); 
     try { await this.pendingTask; } catch { } 
    } 

    newCts.Token.ThrowIfCancellationRequested(); 
    this.pendingTask = SpellcheckAsyncHelper(newCts.Token); 
    return await this.pendingTask; 
} 

// the actual task logic 
async Task<bool> SpellcheckAsyncHelper(CancellationToken token) 
{ 
    // do the work (pretty much IO-bound) 
    using (...) 
    { 
     bool doMore = true; 
     while (doMore) 
     { 
      token.ThrowIfCancellationRequested(); 
      await Task.Delay(500); // placeholder to call the provider 
     } 
     return doMore; 
    } 
} 
+0

@Stephen Cleary,我非常尊重你在所有異步事物上的工作,所以請不要誤以爲這是我的好奇心。我有些驚訝,你沒有用'SemaphoreSlim'或你自己的'AsyncLock'或類似的東西重寫'await this.pendingTask'部分。您是否一般認爲提高異步方法的「同步」部分中的線程安全性是一個過早的優化? –

+0

@KirillShlenskiy:使用「SemaphoreSlim」或其他類似的東西來限制每次一次的限制沒有任何問題。 –

5

下面是最近取消和重新起動模式的版本我使用:

class AsyncWorker 
{ 
    Task _pendingTask; 
    CancellationTokenSource _pendingTaskCts; 

    // the actual worker task 
    async Task DoWorkAsync(CancellationToken token) 
    { 
     token.ThrowIfCancellationRequested(); 
     Debug.WriteLine("Start."); 
     await Task.Delay(100, token); 
     Debug.WriteLine("Done."); 
    } 

    // start/restart 
    public void Start(CancellationToken token) 
    { 
     var previousTask = _pendingTask; 
     var previousTaskCts = _pendingTaskCts; 

     var thisTaskCts = CancellationTokenSource.CreateLinkedTokenSource(token); 

     _pendingTask = null; 
     _pendingTaskCts = thisTaskCts; 

     // cancel the previous task 
     if (previousTask != null && !previousTask.IsCompleted) 
      previousTaskCts.Cancel(); 

     Func<Task> runAsync = async() => 
     { 
      // await the previous task (cancellation requested) 
      if (previousTask != null) 
       await previousTask.WaitObservingCancellationAsync(); 

      // if there's a newer task started with Start, this one should be cancelled 
      thisTaskCts.Token.ThrowIfCancellationRequested(); 

      await DoWorkAsync(thisTaskCts.Token).WaitObservingCancellationAsync(); 
     }; 

     _pendingTask = Task.Factory.StartNew(
      runAsync, 
      CancellationToken.None, 
      TaskCreationOptions.None, 
      TaskScheduler.FromCurrentSynchronizationContext()).Unwrap(); 
    } 

    // stop 
    public void Stop() 
    { 
     if (_pendingTask == null) 
      return; 

     if (_pendingTask.IsCanceled) 
      return; 

     if (_pendingTask.IsFaulted) 
      _pendingTask.Wait(); // instantly throw an exception 

     if (!_pendingTask.IsCompleted) 
     { 
      // still running, request cancellation 
      if (!_pendingTaskCts.IsCancellationRequested) 
       _pendingTaskCts.Cancel(); 

      // wait for completion 
      if (System.Threading.Thread.CurrentThread.GetApartmentState() == ApartmentState.MTA) 
      { 
       // MTA, blocking wait 
       _pendingTask.WaitObservingCancellation(); 
      } 
      else 
      { 
       // TODO: STA, async to sync wait bridge with DoEvents, 
       // similarly to Thread.Join 
      } 
     } 
    } 
} 

// useful extensions 
public static class Extras 
{ 
    // check if exception is OperationCanceledException 
    public static bool IsOperationCanceledException(this Exception ex) 
    { 
     if (ex is OperationCanceledException) 
      return true; 

     var aggEx = ex as AggregateException; 
     return aggEx != null && aggEx.InnerException is OperationCanceledException; 
    } 

    // wait asynchrnously for the task to complete and observe exceptions 
    public static async Task WaitObservingCancellationAsync(this Task task) 
    { 
     try 
     { 
      await task; 
     } 
     catch (Exception ex) 
     { 
      // rethrow if anything but OperationCanceledException 
      if (!ex.IsOperationCanceledException()) 
       throw; 
     } 
    } 

    // wait for the task to complete and observe exceptions 
    public static void WaitObservingCancellation(this Task task) 
    { 
     try 
     { 
      task.Wait(); 
     } 
     catch (Exception ex) 
     { 
      // rethrow if anything but OperationCanceledException 
      if (!ex.IsOperationCanceledException()) 
       throw; 
     } 
    } 
} 

試驗使用(僅產生一個單一的 「開始/完成」 爲DoWorkAsync輸出):

private void MainForm_Load(object sender, EventArgs e) 
{ 
    var worker = new AsyncWorker(); 
    for (var i = 0; i < 10; i++) 
     worker.Start(CancellationToken.None); 
} 
+0

此模式的更新和功能版本位於:http://stackoverflow.com/a/21427264/1768303 – Noseratio

0

希望這將是有用的 - 試圖創建可重複使用的Helper類:上述

class SelfCancelRestartTask 
{ 
    private Task _task = null; 
    public CancellationTokenSource TokenSource { get; set; } = null; 

    public SelfCancelRestartTask() 
    { 
    } 

    public async Task Run(Action operation) 
    { 
     if (this._task != null && 
      !this._task.IsCanceled && 
      !this._task.IsCompleted && 
      !this._task.IsFaulted) 
     { 
      TokenSource?.Cancel(); 
      await this._task; 
      TokenSource = new CancellationTokenSource(); 
     } 
     else 
     { 
      TokenSource = new CancellationTokenSource(); 
     } 
     this._task = Task.Run(operation, TokenSource.Token); 
    } 
0

的例子似乎會有問題,因爲異步方法後迅速相互調用多次,例如四個倍。然後,此方法的所有後續調用將取消第一個任務,最後會生成三個同時運行的新任務。所以我想出了這個:

private List<Tuple<Task, CancellationTokenSource>> _parameterExtractionTasks = new List<Tuple<Task, CancellationTokenSource>>(); 

    /// <remarks>This method is asynchronous, i.e. it runs partly in the background. As this method might be called multiple times 
    /// quickly after each other, a mechanism has been implemented that <b>all</b> tasks from previous method calls are first canceled before the task is started anew.</remarks> 
    public async void ParameterExtraction() { 

     CancellationTokenSource newCancellationTokenSource = new CancellationTokenSource(); 

     // Define the task which shall run in the background. 
     Task newTask = new Task(() => { 
      // do some work here 
       } 
      } 
     }, newCancellationTokenSource.Token); 

     _parameterExtractionTasks.Add(new Tuple<Task, CancellationTokenSource>(newTask, newCancellationTokenSource)); 

     /* Convert the list to arrays as an exception is thrown if the number of entries in a list changes while 
     * we are in a for loop. This can happen if this method is called again while we are waiting for a task. */ 
     Task[] taskArray = _parameterExtractionTasks.ConvertAll(item => item.Item1).ToArray(); 
     CancellationTokenSource[] tokenSourceArray = _parameterExtractionTasks.ConvertAll(item => item.Item2).ToArray(); 

     for (int i = 0; i < taskArray.Length - 1; i++) { // -1: the last task, i.e. the most recent task, shall be run and not canceled. 
      // Cancel all running tasks which were started by previous calls of this method 
      if (taskArray[i].Status == TaskStatus.Running) { 
       tokenSourceArray[i].Cancel(); 
       await taskArray[i]; // wait till the canceling completed 
      } 
     } 

     // Get the most recent task 
     Task currentThreadToRun = taskArray[taskArray.Length - 1]; 

     // Start this task if, but only if it has not been started before (i.e. if it is still in Created state). 
     if (currentThreadToRun.Status == TaskStatus.Created) { 
      currentThreadToRun.Start(); 
      await currentThreadToRun; // wait till this task is completed. 
     } 

     // Now the task has been completed once. Thus we can recent the list of tasks to cancel or maybe run. 
     _parameterExtractionTasks = new List<Tuple<Task, CancellationTokenSource>>(); 
    } 
相關問題