我做了一個擴展方法:具有無限序列是真的荏苒則始終爲假
public static IObservable<T> RateLimit<T>(this IObservable<T> source,
TimeSpan minDelay)
{
return
source.TimeInterval()
.Select(
(x, i) =>
Observable.Return(x.Value)
.Delay(i == 0
? TimeSpan.Zero
: TimeSpan.FromTicks(
Math.Max(minDelay.Ticks - x.Interval.Ticks, 0))))
.Concat();
}
這將創建一個新的觀察到的,只有通過與時間的最小間隔讓項目。
要消除初始等待時間,有必要以不同的方式處理第一個項目。
可以看出,有一個測試,看看我們是通過測試i == 0
來處理第一個項目。這裏的問題是如果我們處理超過int.MaxValue
項目,這將失敗。
相反,我想到了以下順序
var trueThenFalse = Observable.Return(true)
.Concat(Observable.Repeat(false))
,並拉上它在靠我的源:
source.TimeInterval().Zip(trueThenFalse, ...
但通過這個無限序列拉鍊時,我們似乎進入一個死循環其中trueThenFalse
一次發出所有物品(無限)。失敗。
我可以很容易地編寫這個帶有副作用的代碼(例如在外部範圍內的bool
),但這會代表我不滿意的純度損失。
有什麼建議嗎?
編輯
雖然不太一樣的行爲,下面的代碼顯示出一些討厭的特質
var trueThenFalse = Observable.Return(true)
.Concat(Observable.Repeat(false));
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x));
,並最終死亡與OOME。這是因爲trueThenFalse
似乎取消了所有的值,但它們並未被Zip及時消耗。
似乎是trueThenFalse和郵編我工作。 – Evk
因此,這可能與延續計劃的方式有關?鑑於'IObservable'只提供'Subscribe'方法,所以我很難理解無限(冷)序列在訂閱這個「push」模型時不會立即嘗試解除其所有項目的解除操作。 – spender
但zip從兩個序列中獲取下一個元素。首先它從第一個序列中獲得「真實」,從第二個(沒有延遲)中獲得第一個值。然後它從第二個序列中獲得「假」和第二個值,但現在有一個延遲。不知道爲什麼它應該立即對所有物品進行整理也許你可以用trueThenFalse和Zip發佈代碼,你說失敗了? – Evk