2017-01-10 30 views
2

我正在使用GroupByUntil對MSMQ中具有特定屬性值的消息進行分組,這些屬性值工作得非常好。我正在使用此代碼。RX GroupByUntil with sliding直到

observable.GroupByUntil(
    message => message.Source, 
    message => message.Body, 
    message => Observable.Timer(new TimeSpan(0,0,5)) //I thought this was sliding expiration 
).Subscribe(HandleGroup); 

我誤以爲每一次新郵件到達給定組,該組的durationSelector將重啓,基本上等待時間,以結束組之前沒有新的消息傳遞。我意識到情況並非如此,並且durationSelector無論如何都將繼續倒計時。爲每個組分配滑動durationSelector的最佳方式是什麼?

回答

1

Switch是你的朋友。

observable.GroupByUntil(
    message => message.Source, 
    message => message.Body, 
    group => group 
     .Select(message => Observable.Timer(new TimeSpan(0, 0, 5))) 
     .Switch() 
).Subscribe(HandleGroup); 

說明:

  • 對於每個消息,創建如果另一個信息相同的組內沿着附帶5秒後觸發一次
  • 定時器,刪除舊的計時器,並切換到新的一個。
+0

「Switch - 將一個Observable轉換爲一個Observable放入一個Observable,它發出這些Observables中最近發出的項目」hahahah,what!?。從來沒有破譯過。 – sonicblis

+0

Observable操作符的文本解釋通常不足。大理石圖更有價值。 – Shlomo