2015-07-20 80 views
0

基於2014年React 2014上的演講,「一切都是流」,看着Reactive Trader源代碼,我想我會嘗試重新編寫一些舊的代碼來遵循這種模式,但我有點難住。Rx GroupBy - 如何獲取分組值?

我有兩種方法具有以下特徵:

public static IObservable<OrderDto> GetOrderStream(string name) 
public static IObservable<PriceDto> GetPriceStream(string exchange, string security) 

兩種方法都使用Observable.Create包裹起來的一些事件,並使用所創建的可觀測Publish()RefCount()

每個OrderDto都包含用於交換和安全性的字段。我想通過Exchange和安全組對訂單進行分組,以便我可以從單獨的流請求定價信息。對於我的最終結果,我想打印每個訂單以及訂單中交換/安全性的當前定價。

對於訂單我有以下幾點:

var orders = Observable.Defer(() => GetOrderStream("FNZCTEST")) 
               .GroupBy(o => new { o.Exchange, o.Security }) 
               .Publish() 
               .RefCount(); 

如果我使用:

var j = from order in orders 
       from o in order 
       from price in GetPriceStream(order.Key.Exchange, order.Key.Security).Materialize() 
       select new { Order = o, Price = price }; 

IDisposable disposable = j.Subscribe(x => Console.WriteLine("{0} : {1}", x.Order, x.Price.HasValue ? x.Price.Value : new PriceDto())); 

我得到所需的輸出,但GetPriceStream被一再呼籲同一個Exchange /安全性(即不每組一次)。

如果我將其更改爲

var j = from order in orders 
     from price in GetPriceStream(order.Key.Exchange, order.Key.Security).Materialize() 
     select new { Price = price }; 

IDisposable disposable = j.Subscribe(x => Console.WriteLine("{0} : {1}", "", x.Price.HasValue ? x.Price.Value : new PriceDto())); 

然後GetPriceStream是對於每個組我希望調用一次。我的問題是 - 我如何獲得這種期望的行爲並獲得組中每個OrderDto的訪問權限,以便我可以一起輸出訂單和價格。

回答

0

這裏有幾件事情對我沒有意義。我不明白你爲什麼要做.Defer(...),爲什麼你要做.Publish().RefCount()

此外,你正在分組你的結果,然後將它們展平。爲什麼不把它們留在原樣?

最後,看起來您正在使用.Materialize()以某種方式處理GetPriceStream未產生實際值的情況。

因此,考慮到所有這一切,這似乎是你所需要的最可能的查詢:

var query = 
    from order in GetOrderStream("FNZCTEST") 
    from price in GetPriceStream(order.Exchange, order.Security) 
     .DefaultIfEmpty(new PriceDto()) 
    select new 
    { 
     Order = order, 
     Price = price, 
    }; 

IDisposable disposable = 
    query.Subscribe(x => Console.WriteLine("{0} : {1}", x.Order, x.Price)); 

現在,它也有可能是你實現Observable.Create也給你造成的悲傷。所以,如果你願意的話,如果你可以在問題的最後添加實現,那將是非常好的。

+0

我使用了Defer(),以便在訂閱時運行該方法。發佈和RefCount,因爲它可能會在多個地方被調用。 GetOrderStream可以爲特定交易所/安全組合返回多個訂單,但我只想從該GetPriceStream中獲取該組合的價格信息,因此也就是分組。使用Materialize()是因爲Create可以拋出OnError,我不想殺掉這個序列。我試圖讓我的腦袋圍繞Rx,但這是一種全新的思維方式,我還沒有完全掌握。所有幫助讚賞。 –

+0

我會檢查是否需要'.Defer()'。使用'.Publish()。RefCount()'只有在你想讓任何後續訂閱者獲得部分流時纔有用 - 第一個訂閱者獲取所有值,但後續訂閱者只能獲得尚未計算的值。不要使用'.Materialize()' - 嘗試使用'.Catch(...)' - 或者更好的辦法是編寫一個lamdba,它在遇到可觀察流之前清理異常。 – Enigmativity