我認爲使用一個可觀察的立即返回的和同樣使用async/await語法,因爲rasx在註釋中做的事情太混亂了。
讓我們創建一個流與再來一個5個元素每一秒,然後完成:
private IObservable<long> StreamWith5Elements()
{
return Observable.Interval(TimeSpan.FromSeconds(1))
.Take(5);
}
我們可以使用異步調用它/等待魔術在本LINQPad友好例如:
void Main()
{
CountExampleAsync().Wait();
}
private async Task CountExampleAsync()
{
int result = await StreamWith5Elements().Count();
Console.WriteLine(result);
}
但它誤導了這裏發生了什麼 - Count()
返回IObservable<int>
,但Rx是超友好的await
並將該結果流轉換爲Task<int>
- 並等待然後交回那個任務的結果int
。
當您對IObservable<T>
使用await時,您隱含地說您希望observable用單個結果調用OnNext()
,然後調用OnComplete()
。實際情況是,您將得到一個Task<T>
,該值返回流終止前發送的最後一個值。 (類似於AsyncSubject<T>
的行爲)。
這很有用,因爲它意味着任何流都可以映射到Task
,但它確實需要仔細考慮。
現在,上面的例子是等效於下面更傳統的Rx:
void Main()
{
PlainRxCountExample();
}
private void PlainRxCountExample()
{
IObservable<int> countResult = StreamWith5Elements().Count();
countResult.Subscribe(count => Console.WriteLine(count));
/* block until completed for the sake of the example */
countResult.Wait();
}
在這裏你可以看到Count()
確實返回int流 - 提供異步計數。它只會在源碼流完成時纔會返回。
在Rx的早期,Count()實際上是同步的。
但是,這並不是一個非常有用的狀態,因爲它「退出Monad」 - 即將您帶出IObservable<T>
,並阻止您與Rx操作員進一步組合。
一旦你開始「思考流」,Count()的異步特性真的很直觀,因爲當然你只能在流完成時提供一個流數 - 以及爲什麼要這樣做? :)
你可以......嘗試一下。 – Magus
呃哦:看起來這是工作:'var count = await(Observable.Return(「Hello world!」))。Count();' –
rasx