其他東西Observable.Buffer我一直在尋找如何在RX使用Observable.Buffer例子,但找不到任何比鍋爐板時間緩衝的東西更充實。是否有可能比上一次
有似乎是指定一個「bufferClosingSelector」過載,但我不能完成我的腦海裏解決它。
我試圖做的是創建一個按時間或按一個「吸籌」緩衝的序列。 考慮一個請求流,其中每個請求都有一定的權重,而且我不想一次處理超過x累積的權重,或者如果沒有足夠的累積權限,只需要給我上一個時間幀中的內容(常規緩衝區功能)
其他東西Observable.Buffer我一直在尋找如何在RX使用Observable.Buffer例子,但找不到任何比鍋爐板時間緩衝的東西更充實。是否有可能比上一次
有似乎是指定一個「bufferClosingSelector」過載,但我不能完成我的腦海裏解決它。
我試圖做的是創建一個按時間或按一個「吸籌」緩衝的序列。 考慮一個請求流,其中每個請求都有一定的權重,而且我不想一次處理超過x累積的權重,或者如果沒有足夠的累積權限,只需要給我上一個時間幀中的內容(常規緩衝區功能)
bufferClosingSelector
是一個名爲每次獲得可觀察到的,當預計緩衝被關閉,這將產生一個值的功能。
例如,
source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1)))
作品像普通Buffer(time)
超載。
在要重量序列,您可以通過序列應用Scan
,然後在你的聚合條件決定。
例如,source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
讓你產生當源序列加起來超過100
您可以使用Amb
比賽這兩個成交條件,看看哪一個值序列第一反應:
.Buffer(() => Observable.Amb
(
Observable.Timer(TimeSpan.FromSeconds(1)),
source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
)
)
可以使用任何系列組合子的產生用於緩衝在該點處被關閉的任何值。
注意: 關閉選擇器給出的值並不重要 - 這是重要的通知。因此,要將不同類型的來源與Amb
組合起來,只需將其更改爲System.Reactive.Unit
即可。
Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
只是一個簡單的說明,當源是其他類型的可觀察類型時,amb似乎不起作用 – Dmitry 2012-03-06 21:05:19
@Dmitry我只是給出了基本的想法。我編輯過它以包含不同類型的示例。 – Asti 2012-03-07 06:01:38
是否可以從觀察者訪問緩衝區關閉值?例如。時間戳緩衝區用於關閉。 – liang 2015-07-17 19:49:07