2014-11-08 23 views
2

我是一個嘗試構建對我來說似乎很複雜的事情的RX新手。構建一個RX操作符,將一個可觀察到的鍵值對分成多個觀察對象,每個獨立鍵一個

問題出在這裏:我有一個熱門的可觀察值產生鍵值對,比如說< int,foo >。他們沒有特別的順序。輸出應該是源觀察值中每個不同鍵的可觀察值。因此,對於源中的每個值,結果應該是:如果密鑰尚未顯示,新的可觀察值立即生成相應的值。如果之前已經看到密鑰,則相應的值應該由已經存在的相應可觀察值產生。所以:

<1, foo>-<2, foo>-<2, foo>-<1, foo>-<3, foo>-<4, foo>-<1, foo>-<3, foo>x 

Output: 
<1, foo>-------------------<1, foo>-------------------<1, foo>---------x 
---------<2, foo>-<2,foo>----------------------------------------------x 
------------------------------------<3, foo>-------------------<3, foo>x 
---------------------------------------------<4, foo>------------------x 

我與窗口嘗試這一點,但我堅持就如何「檢測」,一個關鍵的已經見到的,然後「路線」的值,以現有的觀測。

謝謝!

回答

4

這正是GroupBy所做的。例如爲:

var subject = new Subject<KeyValuePair<int, string>>(); 

var groups = subject.GroupBy(x => x.Key); 

GroupBy需要,其選擇從一個元素的密鑰一個KeySelector功能。輸出是一串流,其中每個流發出的類型爲IGroupedObservable<int, KeyValuePair<int, string>>。這是IObservable的子類,併爲您提供Key屬性,以便您還可以識別選定的密鑰。

所以,如果下面的代碼添加到上面的例子中,我們只是打印出每一個新流的關鍵:

groups.Subscribe(x => Console.WriteLine(x.Key)); 

subject.OnNext(new KeyValuePair<int, string>(1, "a")); 
subject.OnNext(new KeyValuePair<int, string>(2, "b")); 
subject.OnNext(new KeyValuePair<int, string>(1, "c")); 

我們得到:

1 
2 

哪些是關鍵這兩條流都產生了。兩個證明我們擁有所有的值,如果不是我們使用SelectMany趨於平緩GroupBy的結果,並挖出只值:

groups.SelectMany(x => x).Subscribe(y => Console.WriteLine(y.Value)); 

subject.OnNext(new KeyValuePair<int, string>(1, "a")); 
subject.OnNext(new KeyValuePair<int, string>(2, "b")); 
subject.OnNext(new KeyValuePair<int, string>(1, "c")); 

我們得到的輸出:

a 
b 
c 

預期。

+2

注意'GroupBy'組持續到源流終止 - 所以如果你有很長的運行流,你可能需要'GroupByUntil',它可以基於持續時間選擇器限制每個組的生存期。如果羣組已被持續時間選擇器終止,則帶有該密鑰的新值將導致形成新羣組。 – 2014-11-08 22:27:36

+0

是這正是我需要的。我希望自己並沒有很慢並重新實現太多其他現有的運營商=/ 感謝您的理解! – tempy 2014-11-09 10:48:59

相關問題