2013-08-07 23 views
1

讓我們考慮的方法:取消令牌無法訪問時如何中止或終止TPL的任務?

Task Foo(IEnumerable items, CancellationToken token) 
{ 
    return Task.Run(() => 
    { 
     foreach (var i in items) 
      token.ThrowIfCancellationRequested(); 

    }, token); 
} 

然後,我有一個消費者:

var cts = new CancellationTokenSource(); 
var task = Foo(Items, cts.token); 
task.Wait(); 

和物品的例子:

IEnumerable Items 
{ 
    get 
    { 
     yield return 0; 
     Task.Delay(Timeout.InfiniteTimeSpan).Wait(); 
     yield return 1; 
    } 
} 

約task.Wait什麼? 我不能把我的取消標記放入項目的集合中。

如何殺死沒有響應的任務或解決這個問題?

+1

爲什麼你不能將你的令牌傳遞給Items()? –

+0

@DaxFohl我將它們作爲參數。在我的問題中,這些項目只是可視化的內容。 –

+0

你可以使cts成爲一個成員變量,並從'Items'中使用它? –

回答

0

其行爲我previous solution是基於樂觀的假設枚舉可能不會掛,是相當快的。因此,我們有時可以犧牲系統線程池的一個線程?正如Dax Fohl指出的那樣,即使任務的父任務已被取消異常終止,該任務仍將處於活動狀態。在這方面,如果幾個集合已被無限期地凍結,那麼這可能會阻塞默認任務調度程序使用的潛在ThreadPool。

因此我已經重構ToCancellable方法:

public static IEnumerable<T> ToCancellable<T>(this IEnumerable<T> @this, CancellationToken token) 
{ 
    var enumerator = @this.GetEnumerator(); 
    var state = new State(); 

    for (; ;) 
    { 
     token.ThrowIfCancellationRequested(); 

     var thread = new Thread(s => { ((State)s).Result = enumerator.MoveNext(); }) { IsBackground = true, Priority = ThreadPriority.Lowest }; 
     thread.Start(state); 

     try 
     { 
      while (!thread.Join(10)) 
       token.ThrowIfCancellationRequested(); 
     } 
     catch (OperationCanceledException) 
     { 
      thread.Abort(); 
      throw; 
     } 

     if (!state.Result) 
      yield break; 

     yield return enumerator.Current; 
    } 
} 

而一個幫助類來管理結果:

class State 
{ 
    public bool Result { get; set; } 
} 

它是安全的中止分離線程。

我在這裏看到的痛苦是一個很重的線創作。這可以通過使用自定義線程池以及生產者 - 使用者模式來解決,該模式能夠處理異常中斷以便從池中刪除斷開的線程。

另一個問題是在連接線。什麼是最好的暫停?也許這應該是用戶負責,並作爲方法論點。

1

爲什麼不能通過CancellationToken到Items()

IEnumerable Items(CancellationToken ct) 
{ 
    yield return 0; 
    Task.Delay(Timeout.InfiniteTimeSpan, ct).Wait(); 
    yield return 1; 
} 

你將不得不同樣傳遞給Items()爲你傳遞給Foo(),當然。

0

如何在enumerable周圍創建一個包裝,它本身可以在物品之間取消?

IEnumerable<T> CancellableEnum<T>(IEnumerable<T> items, CancellationToken ct) { 
    foreach (var item in items) { 
     ct.ThrowIfCancellationRequested(); 
     yield return item; 
    } 
} 

......雖然這似乎是什麼Foo()已經做了什麼。如果你有一些地方可以無限地阻止這個枚舉(並且它不是很慢),那麼你要做的是在用戶端的task.Wait()上添加一個超時和/或取消標記。

+2

如果'Items'在'Task.Delay()' –

+0

被阻止的情況下無法使用...因此「如果它不會無限地阻止」部分。 ;) – pkt

1

嘗試使用TaskCompletionSource並返回。如果內部任務運行到完成(或故障),則可以將TaskCompletionSource設置爲內部任務的結果(或錯誤)。但是如果CancellationToken被觸發,您可以將其設置爲立即取消。

Task<int> Foo(IEnumerable<int> items, CancellationToken token) 
{ 
    var tcs = new TaskCompletionSource<int>(); 
    token.Register(() => tcs.TrySetCanceled()); 
    var innerTask = Task.Factory.StartNew(() => 
    { 
     foreach (var i in items) 
      token.ThrowIfCancellationRequested(); 
     return 7; 
    }, token); 
    innerTask.ContinueWith(task => tcs.TrySetResult(task.Result), TaskContinuationOptions.OnlyOnRanToCompletion); 
    innerTask.ContinueWith(task => tcs.TrySetException(task.Exception), TaskContinuationOptions.OnlyOnFaulted); 
    return tcs.Task; 
} 

事實上這並不會殺死內的任務,但它會給你一個任務,你可以從馬上繼續取消。爲了消除內部任務,因爲它正在無限超時,所以我相信你唯一可以做的就是抓住對你開始任務的Thread.CurrentThread的引用,然後從Foo中調用taskThread.Abort(),這當然是不好的做法。但在這種情況下,你的問題真的歸結爲「我怎樣才能使一個長時間運行的函數終止而無需訪問代碼」,這隻能通過Thread.Abort來實現。

1

你可以有項目是IEnumerable<Task<int>>,而不是IEnumerable<int>?然後,你可以做

return Task.Run(() => 
{ 
    foreach (var task in tasks) 
    { 
     task.Wait(token); 
     token.ThrowIfCancellationRequested(); 
     var i = task.Result; 
    } 
}, token); 

雖然這樣的事情可能會更直接使用無框架,做items.ToObservable做。這將是這樣的:

static Task<int> Foo(IEnumerable<int> items, CancellationToken token) 
{ 
    var sum = 0; 
    var tcs = new TaskCompletionSource<int>(); 
    var obs = items.ToObservable(ThreadPoolScheduler.Instance); 
    token.Register(() => tcs.TrySetCanceled()); 
    obs.Subscribe(i => sum += i, tcs.SetException,() => tcs.TrySetResult(sum), token); 
    return tcs.Task; 
} 
3

我找到了一個解決方案,允許把取消標記爲從THID方發起的項目:

public static IEnumerable<T> ToCancellable<T>(this IEnumerable<T> @this, CancellationToken token) 
{ 
    var enumerator = @this.GetEnumerator(); 

    for (; ;) 
    { 
     var task = Task.Run(() => enumerator.MoveNext(), token); 
     task.Wait(token); 

     if (!task.Result) 
      yield break; 

     yield return enumerator.Current; 
    } 
} 

現在我需要使用:

Items.ToCancellable(cts.token) 

取消請求後不會掛起。

+0

這似乎是一個很好的解決方案。我測試了它,它似乎很好地工作。 –

+1

我同意,它比任何一個都更靈活。請注意,它仍然會將任務打開(傳遞令牌'task.Wait'實際上不會終止任務;它只是取消了等待)。爲了解決這個問題,你需要'Thread.Abort'。 –

+0

@DaxFohl你說得對。該任務仍在運行。最重要的是,由於先前取消的例外情況,它不會對可撤銷線程產生更多影響。即使我們將無限等待替換爲某個小值,那麼只會執行下一個收益回報,其結果將被忽略。 –

2

您無法真正取消不可取消的操作。 Stephen Toub在Parallel FX Team的博客中詳細介紹了「How do I cancel non-cancelable async operations?」,但其實質是您需要了解您真正想要做什麼?

  1. 本身停止異步/長時間運行?如果您沒有辦法指示操作,則無法以合作的方式執行
  2. 停止等待操作完成,忽略任何結果?這是可行的,但由於顯而易見的原因會導致不可靠性。您可以使用傳遞取消標記的長操作來啓動任務,也可以像Stephen Toub所描述的那樣使用TaskCompletionSource。

您需要決定要找到合適的解決方案