2017-03-03 27 views
1

我有作爲的GroupBy預計該工程的一些測試代碼...使用的GroupBy用的毗連只返回第一組值

代碼

var sw = Stopwatch.StartNew(); 
int groupSize = 5; 

var coreObservable = Observable 
    .Range(1, 20) 
    .Select((x, idx) => new { x, idx }) 
    .GroupBy(x => x.idx/groupSize, x => x.x) 
    .Select(x => x.ToList()) 
    .Replay() 
    .RefCount(); 

coreObservable.Subscribe(
    x => x.Subscribe(y => Console.WriteLine("Event raised [Books: {0}, Timestamp: {1}]", string.Join("|", y), sw.Elapsed)), 
    () => Console.WriteLine("Subcription closed")); 

coreObservable.Wait(); // blocking until observable completes 

輸出

Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.3224002] 
Event raised [Values: 6|7|8|9|10, Timestamp: 00:00:00.3268353] 
Event raised [Values: 11|12|13|14|15, Timestamp: 00:00:00.3270101] 
Event raised [Values: 16|17|18|19|20, Timestamp: 00:00:00.3270803] 
Subcription closed 

問題是當我嘗試用這個表達式使用Concat時...

代碼

var sw = Stopwatch.StartNew(); 
int groupSize = 5; 

var coreObservable = Observable 
    .Range(1, 20) 
    .Select((x, idx) => new { x, idx }) 
    .GroupBy(x => x.idx/groupSize, x => x.x) 
    .Select(x => x.ToList()) 
    .Concat() // JUST ADDED THIS 
    .Replay() 
    .RefCount(); 

coreObservable.Subscribe(
    x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed), 
    () => Console.WriteLine("Subcription closed")); 

coreObservable.Wait(); // blocking until observable completes 

輸出

Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.2728469] 
Event raised [Values: , Timestamp: 00:00:00.2791311] 
Event raised [Values: , Timestamp: 00:00:00.2793720] 
Event raised [Values: , Timestamp: 00:00:00.2794617] 
Subcription closed 

通知僅在第一組值露出。

我使用GroupBy而不是Buffer的原因是因爲我試圖用它來創建數據饋送的最大大小塊,這些數據饋送以突發形式出現。原始的observable可能是項目的數組,當單個事件中的項目太多時,我想分割這些數組。

我想使用Concat的原因是因爲我想能夠在陣列事件之間創建延遲,就像很多人推薦here一樣。

回答

2

Merge()替換Concat(),它能正常工作。

我相信你的問題的原因是,Concat()將不會開始聽下一個序列,直到當前的一個完成。

的毗連圖:

s1 --0--1--2-| 
s2   -5--6--7--8--| 
r --0--1--2--5--6--7--8--| 

雖然Merge()訂閱了在同一時間的所有子序列,併發布每當任何子出版值的值。

合併圖:

s1 --1--1--1--| 
s2 ---2---2---2| 
r --12-1-21--2| 

所以你的情況,該Concat()預訂來自Select(x => x.ToList())第一IObservable<IList<int>>,公佈值,直到它完成,然後訂閱到下一個序列。 GroupBy()將爲其找到的每個組創建一個新的IGroupedObservable流,但所有IGroupedObservables將同時完成:當基礎流完成時。

因此Concat()偵聽第一個流直到它完成,但是當第一個流完成時,所有其他流都完成了(因爲它們實際上都是相同的序列,只是按鍵分割),所以沒有值它發佈以下序列。

所有的圖都是從here借來的,這對於Rx來說是一個很好的資源,我強烈建議你看看有關各種操作員工作方式的問題。

+0

問題是我使用熱可觀察事實的一部分?所以也許所有這些事件在concat訂閱之前就已經發生了。 – KrisG

+0

是的,這是確切的問題。所有的觀察對象都是由GroupBy()立即「啓動」的,但Concat只能一次一個地監聽它們。 –

0

您的問題,可以減少這樣的事情,這可能是簡單的想起:

var sw = Stopwatch.StartNew(); 
var subject = new Subject<int>(); 

var o2 = subject.Where(i => i % 2 == 0).ToList(); 
var o3 = subject.Where(i => i % 3 == 0).ToList(); 
var o4 = subject.Where(i => i % 4 == 0).ToList(); 

var c = Observable.Concat(o2, o3, o4) 
//  .Replay() 
//  .RefCount() 
//.Replay().RefCount() has no impact here. 
    ; 

c.Subscribe(
    x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed), 
    () => Console.WriteLine("Subcription closed")); 

for(int i = 0; i < 6; i++) 
    subject.OnNext(i); 
subject.OnCompleted(); 

輸出:

Event raised [Values: 0|2|4, Timestamp: 00:00:00.0002278] 
Event raised [Values: , Timestamp: 00:00:00.0002850] 
Event raised [Values: , Timestamp: 00:00:00.0003049] 
Subcription closed 

如果你是大理石圖這些,它會是這樣的:

s :| 
o2 : ------(024)| 
o3 : ------(03) | 
o4 : ------(04) | 

cOut: ------(024)| 
cSub: (So2)------(So3)(So4) 

cSub shows when c subscribes to child observables. cOut shows c's output. 
So2 means subscribe to o2, So3 means subscribe to o3, etc.. 

Concat訂閱了傳遞給它的第一個observable,然後onl噹噹前一個完成時,y訂閱隨後的可觀察值。在我們的例子中,ToList不會遺漏任何東西,直到源完成,當它轉儲整個列表時。所以o2,o3,全部同時完成,但c只訂閱了o2o2完成後,它會嘗試訂閱其他人,但他們已經完成。

至於如何解決它,Merge會工作,但我猜你想在組2之前處理組1,其中Merge會中斷。