基於2014年React 2014上的演講,「一切都是流」,看着Reactive Trader源代碼,我想我會嘗試重新編寫一些舊的代碼來遵循這種模式,但我有點難住。Rx GroupBy - 如何獲取分組值?
我有兩種方法具有以下特徵:
public static IObservable<OrderDto> GetOrderStream(string name)
public static IObservable<PriceDto> GetPriceStream(string exchange, string security)
兩種方法都使用Observable.Create
包裹起來的一些事件,並使用所創建的可觀測Publish()
和RefCount()
。
每個OrderDto都包含用於交換和安全性的字段。我想通過Exchange和安全組對訂單進行分組,以便我可以從單獨的流請求定價信息。對於我的最終結果,我想打印每個訂單以及訂單中交換/安全性的當前定價。
對於訂單我有以下幾點:
var orders = Observable.Defer(() => GetOrderStream("FNZCTEST"))
.GroupBy(o => new { o.Exchange, o.Security })
.Publish()
.RefCount();
如果我使用:
var j = from order in orders
from o in order
from price in GetPriceStream(order.Key.Exchange, order.Key.Security).Materialize()
select new { Order = o, Price = price };
IDisposable disposable = j.Subscribe(x => Console.WriteLine("{0} : {1}", x.Order, x.Price.HasValue ? x.Price.Value : new PriceDto()));
我得到所需的輸出,但GetPriceStream
被一再呼籲同一個Exchange /安全性(即不每組一次)。
如果我將其更改爲
var j = from order in orders
from price in GetPriceStream(order.Key.Exchange, order.Key.Security).Materialize()
select new { Price = price };
IDisposable disposable = j.Subscribe(x => Console.WriteLine("{0} : {1}", "", x.Price.HasValue ? x.Price.Value : new PriceDto()));
然後GetPriceStream
是對於每個組我希望調用一次。我的問題是 - 我如何獲得這種期望的行爲並獲得組中每個OrderDto的訪問權限,以便我可以一起輸出訂單和價格。
我使用了Defer(),以便在訂閱時運行該方法。發佈和RefCount,因爲它可能會在多個地方被調用。 GetOrderStream可以爲特定交易所/安全組合返回多個訂單,但我只想從該GetPriceStream中獲取該組合的價格信息,因此也就是分組。使用Materialize()是因爲Create可以拋出OnError,我不想殺掉這個序列。我試圖讓我的腦袋圍繞Rx,但這是一種全新的思維方式,我還沒有完全掌握。所有幫助讚賞。 –
我會檢查是否需要'.Defer()'。使用'.Publish()。RefCount()'只有在你想讓任何後續訂閱者獲得部分流時纔有用 - 第一個訂閱者獲取所有值,但後續訂閱者只能獲得尚未計算的值。不要使用'.Materialize()' - 嘗試使用'.Catch(...)' - 或者更好的辦法是編寫一個lamdba,它在遇到可觀察流之前清理異常。 – Enigmativity