2013-07-04 64 views
7

我有一個async方法是讀取流中的長時間運行的方法,並在找到的東西觸發一個事件:轉換異步方法返回的IObservable <>

public static async void GetStream(int id, CancellationToken token) 

這需要取消標記,因爲它是創造一項新任務。在內部調用await時,它讀取數據流:

var result = await sr.ReadLineAsync() 

現在,我想將其轉換爲一個返回的IObservable <>,使我可以利用此功能反應擴展的方法。從我讀過,要做到這一點的最好辦法是使用Observable.Create,由於RX 2.0現在還支持異步I可以得到這一切的是這樣工作的:

public static IObservable<Message> ObservableStream(int id, CancellationToken token) 
{ 
    return Observable.Create<Message>(
     async (IObserver<Message> observer) => 
      { 

代碼內的其餘是一樣的,但我沒有打電話給我打電話observer.OnNext()。但是,這感覺不對。首先,我在裏面混合了CancellationTokens,儘管添加了async關鍵字使它起作用,但這實際上是最好的做法嗎?我打電話給我的ObservableStream這樣的:

Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m)); 
+1

你應該幾乎從來不使用'異步void',肯定不會在圖書館的方法。 – svick

回答

1

你應該改變,而不是無效GetStream返回一個任務,(返回異步無效並不好,當絕對需要的,除了作爲svick評論)。一旦你返回一個任務,你可以調用.ToObservable(),你就完成了。

例如:

public static async Task<int> GetStream(int id, CancellationToken token) { ... } 

然後,

GetStream(1, new CancellationToken(false)) 
    .ToObservable() 
    .Subscribe(Console.Write); 
+0

我覺得'GetStream()'多次觸發一個事件,所以'任務'是不夠的。 – svick

+0

是的,簽名是一個例子,但在編輯中,我暗示通過直接使用ToObservable()來擺脫GetStream。我正在更新我的答案並做了一些澄清。 –

+0

啊對,謝謝。是的,GetStream永遠運行並在發現消息時觸發事件。所以,如果我用'任務'相反,會很有意義嗎?我會在那裏放棄其他事情嗎? –

11

你是正確的。一旦通過IObservable表示您的界面,您應該避免要求呼叫者提供CancellationToken。這並不意味着你不能在內部使用它們。 Rx提供了幾種機制來生成CancellationToken實例,當觀察者取消訂閱您的可觀察項時,這些實例將被取消。

有很多方法可以解決您的問題。最簡單的代碼幾乎不需要改變。它採用的Observable.Create一個重載提供你一個CancellationToken如果主叫方取消訂閱觸發:

public static IObservable<Message> ObservableStream(int id) 
{ 
    return Observable.Create<Message>(async (observer, token) => 
    { 
     // no exception handling required. If this method throws, 
     // Rx will catch it and call observer.OnError() for us. 
     using (var stream = /*...open your stream...*/) 
     { 
      string msg; 
      while ((msg = await stream.ReadLineAsync()) != null) 
      { 
       if (token.IsCancellationRequested) { return; } 
       observer.OnNext(msg); 
      } 
      observer.OnCompleted(); 
     } 
    }); 
} 
+0

不應該'if(token.IsCancellationRequested){return; }'是'if(token.IsCancellationRequested){break; }'這樣'OnCompleted'被調用? – TiMoch

+0

@TiMoch在這種情況下,觀察者已經取消訂閱,不需要向他們發送任何其他通知 – Brandon

相關問題