我有作爲的GroupBy預計該工程的一些測試代碼...使用的GroupBy用的毗連只返回第一組值
代碼
var sw = Stopwatch.StartNew();
int groupSize = 5;
var coreObservable = Observable
.Range(1, 20)
.Select((x, idx) => new { x, idx })
.GroupBy(x => x.idx/groupSize, x => x.x)
.Select(x => x.ToList())
.Replay()
.RefCount();
coreObservable.Subscribe(
x => x.Subscribe(y => Console.WriteLine("Event raised [Books: {0}, Timestamp: {1}]", string.Join("|", y), sw.Elapsed)),
() => Console.WriteLine("Subcription closed"));
coreObservable.Wait(); // blocking until observable completes
輸出
Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.3224002]
Event raised [Values: 6|7|8|9|10, Timestamp: 00:00:00.3268353]
Event raised [Values: 11|12|13|14|15, Timestamp: 00:00:00.3270101]
Event raised [Values: 16|17|18|19|20, Timestamp: 00:00:00.3270803]
Subcription closed
問題是當我嘗試用這個表達式使用Concat時...
代碼
var sw = Stopwatch.StartNew();
int groupSize = 5;
var coreObservable = Observable
.Range(1, 20)
.Select((x, idx) => new { x, idx })
.GroupBy(x => x.idx/groupSize, x => x.x)
.Select(x => x.ToList())
.Concat() // JUST ADDED THIS
.Replay()
.RefCount();
coreObservable.Subscribe(
x => Console.WriteLine("Event raised [Values: {0}, Timestamp: {1}]", string.Join("|", x), sw.Elapsed),
() => Console.WriteLine("Subcription closed"));
coreObservable.Wait(); // blocking until observable completes
輸出
Event raised [Values: 1|2|3|4|5, Timestamp: 00:00:00.2728469]
Event raised [Values: , Timestamp: 00:00:00.2791311]
Event raised [Values: , Timestamp: 00:00:00.2793720]
Event raised [Values: , Timestamp: 00:00:00.2794617]
Subcription closed
通知僅在第一組值露出。
我使用GroupBy而不是Buffer的原因是因爲我試圖用它來創建數據饋送的最大大小塊,這些數據饋送以突發形式出現。原始的observable可能是項目的數組,當單個事件中的項目太多時,我想分割這些數組。
我想使用Concat的原因是因爲我想能夠在陣列事件之間創建延遲,就像很多人推薦here一樣。
問題是我使用熱可觀察事實的一部分?所以也許所有這些事件在concat訂閱之前就已經發生了。 – KrisG
是的,這是確切的問題。所有的觀察對象都是由GroupBy()立即「啓動」的,但Concat只能一次一個地監聽它們。 –