2013-11-04 28 views
0

是否有示例顯示Observable.Count<TSource> Method實際上是如何工作的?我提出的例子似乎返回一個包含在observable中的計數,而不是預期的計數。是否IEnumerable <T> .Count()實際上適用於IObservable <T>?

例如,我希望1從這個返回:

System.Diagnostics.Debug.WriteLine((Observable.Return<string>("Hello world!")).Count()); 

1在未來返回(因爲,畢竟,這是一個異步序列)?或者我錯過了一些基本的東西?在撰寫本文時,我實際上假設.Count()將返回T的結果,並隨着時間推移而增長,只要結果被推出。真?是。

+4

你可以......嘗試一下。 – Magus

+0

呃哦:看起來這是工作:'var count = await(Observable.Return (「Hello world!」))。Count();' – rasx

回答

5

中的Rx的總運營商合作有點不同於在LINQ - 他們沒有立即返回一個值,他們返回未來結果(即我們只能知道最終計數是一旦完成可觀察到的)。

所以,如果你寫:

Observable.Return("foo").Count().Subscribe(x => Console.WriteLine(x)); 
>>> 1 

因爲,畢竟,這是一個異步序列

這實際上是不完全正確的。在這裏,只要有人打電話給Subscribe,一切都會立即執行。上面的代碼沒有任何異步,沒有額外的線程,所有事情都發生在訂閱上。

4

我認爲使用一個可觀察的立即返回的同樣使用async/await語法,因爲rasx在註釋中做的事情太混亂了。

讓我們創建一個流與再來一個5個元素每一秒,然後完成:

private IObservable<long> StreamWith5Elements() 
{ 
    return Observable.Interval(TimeSpan.FromSeconds(1)) 
        .Take(5); 
} 

我們可以使用異步調用它/等待魔術在本LINQPad友好例如:

void Main() 
{ 
    CountExampleAsync().Wait(); 
} 

private async Task CountExampleAsync() 
{ 
    int result = await StreamWith5Elements().Count(); 
    Console.WriteLine(result); 
} 

但它誤導了這裏發生了什麼 - Count()返回IObservable<int>,但Rx是超友好的await並將該結果流轉換爲Task<int> - 並等待然後交回那個任務的結果int

當您對IObservable<T>使用await時,您隱含地說您希望observable用單個結果調用OnNext(),然後調用OnComplete()。實際情況是,您將得到一個Task<T>,該值返回流終止前發送的最後一個值。 (類似於AsyncSubject<T>的行爲)。

這很有用,因爲它意味着任何流都可以映射到Task,但它確實需要仔細考慮。

現在,上面的例子是等效於下面更傳統的Rx:

void Main() 
{ 
    PlainRxCountExample(); 
} 

private void PlainRxCountExample() 
{ 
    IObservable<int> countResult = StreamWith5Elements().Count(); 
    countResult.Subscribe(count => Console.WriteLine(count)); 

    /* block until completed for the sake of the example */ 
    countResult.Wait(); 
} 

在這裏你可以看到Count()確實返回int流 - 提供異步計數。它只會在源碼流完成時纔會返回。

在Rx的早期,Count()實際上是同步的。

但是,這並不是一個非常有用的狀態,因爲它「退出Monad」 - 即將您帶出IObservable<T>,並阻止您與Rx操作員進一步組合。

一旦你開始「思考流」,Count()的異步特性真的很直觀,因爲當然你只能在流完成時提供一個流數 - 以及爲什麼要這樣做? :)

+0

不是'Observable.Return '非常接近'Task.Factory .StartNew '? Joseph Albahari建議,當我們的流只有一個可觀察點時,我們應該考慮使用'任務'。 – rasx

+1

那麼,Return並不是異步的,但是一般來說任何'IObservable '都是一個'Task '的候選人。這一切都取決於你打算如何使用它。有時候,留在'IObservable '中的成分優勢比轉換爲任務更有用,有時候相反。當你需要一段時間以上的結果時,顯然前者具有優勢。 Rx通過其調度程序還可以更好地對參數化時間進行參數化,這對測試有益。 –

相關問題