2013-08-21 31 views
3

我有一種方法返回IObservable<long>,我從async方法調用。我想將其轉換爲正常的List<long>,但如果我的CancellationToken被髮信號,那麼該操作將被取消。如何將IObservable <T>轉換爲支持CancellationToken的RX列表

我願做這樣的事情:

List<long> result = await Foo().ToList(myCancellationToken); 

什麼是實現這個正確的(簡單的)的方法是什麼? IObservable<T>ToList()擴展方法返回IObservable<List<T>>,並且不帶CancellationToken參數。

回答

5
var list = await Foo().ToList().ToTask(cancellationToken); 

這有如果令牌被取消立即取消的優勢(對方的回答不會取消下一次Foo觀察的產生值,直到)。

+0

正是我害怕與其他答案。這似乎很好地訣竅。謝謝! – DeCaf

+0

我在回答中添加了一個擴展方法''TakeWhile(CancellationToken t)'',以確保完成發生的早期而不是下一個事件。在某些情況下它可能更有用。然而,上述答案比我編輯前的最初嘗試更正確。 – bradgonesurfing

+0

我肯定會更喜歡使用內置的Rx擴展方法(這個答案),而不是創建新的(另一個答案)。如果你正在尋找一個「更可組合的」解決方案,那麼我會傾向於'CancellationDisposable'類型和更多的內置類型。 –

3

使用TakeWhile來終止列表。

CancellationToken MyToken = ... 
var list = await Foo().TakeWhile(v=>!MyToken.IsCancellationRequested).ToList(); 

如果您擔心訂閱只有在提供下一項時才取消,您可以使用此擴展方法。

public static IObservable<T> 
TakeWhile<T> 
    (this IObservable<T> This 
    , CancellationToken t 
    ) 
{ 
    var cts = CancellationTokenSource.CreateLinkedTokenSource(t); 
    return Observable.Create<T>((IObserver<T> observer) => 
     { 
      This.Subscribe(observer, cts.Token); 
      return Disposable.Create(() => cts.Cancel()); 
    }); 
} 

,寫

CancellationToken MyToken = ... 
var list = await Foo().TakeWhile(MyToken.IsCancellationRequested).ToList(); 

使用TakeWhile帶有消除令牌比ToTask剛剛返回的最後一個元素更可組合。

+0

看起來不錯,除了這個擴展方法的示例調用不應該是'... TakeWhile(MyToken)'嗎?當然,這比上面的解決方案更普遍,但是如果我做'.ToList()'總是返回一個只有一個元素的'IObservable',不是嗎? – DeCaf

+0

此外,名稱'TakeWhile'在這裏似乎有點誤導,因爲取消令牌取消*時不會取消,否則它似乎是一個很好的解決方案。 (我還有很多要學習RX的東西:) – DeCaf

相關問題