2016-11-22 64 views
4

我做了一個擴展方法:具有無限序列是真的荏苒則始終爲假

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
              TimeSpan minDelay) 
{ 
    return 
     source.TimeInterval() 
      .Select(
       (x, i) => 
        Observable.Return(x.Value) 
         .Delay(i == 0 
          ? TimeSpan.Zero 
          : TimeSpan.FromTicks(
            Math.Max(minDelay.Ticks - x.Interval.Ticks, 0)))) 
      .Concat(); 
} 

這將創建一個新的觀察到的,只有通過與時間的最小間隔讓項目。

要消除初始等待時間,有必要以不同的方式處理第一個項目。

可以看出,有一個測試,看看我們是通過測試i == 0來處理第一個項目。這裏的問題是如果我們處理超過int.MaxValue項目,這將失敗。

相反,我想到了以下順序

var trueThenFalse = Observable.Return(true) 
        .Concat(Observable.Repeat(false)) 

,並拉上它在靠我的源:

source.TimeInterval().Zip(trueThenFalse, ... 

但通過這個無限序列拉鍊時,我們似乎進入一個死循環其中trueThenFalse一次發出所有物品(無限)。失敗。

我可以很容易地編寫這個帶有副作用的代碼(例如在外部範圍內的bool),但這會代表我不滿意的純度損失。

有什麼建議嗎?

編輯

雖然不太一樣的行爲,下面的代碼顯示出一些討厭的特質

var trueThenFalse = Observable.Return(true) 
    .Concat(Observable.Repeat(false)); 
var src = Observable.Interval(TimeSpan.FromSeconds(1)); //never completes 
src.Zip(trueThenFalse,(i,tf)=>tf).ForEach(x=>Trace.TraceInformation("{0}",x)); 

,並最終死亡與OOME。這是因爲trueThenFalse似乎取消了所有的值,但它們並未被Zip及時消耗。

+0

似乎是trueThenFalse和郵編我工作。 – Evk

+0

因此,這可能與延續計劃的方式有關?鑑於'IObservable'只提供'Subscribe'方法,所以我很難理解無限(冷)序列在訂閱這個「push」模型時不會立即嘗試解除其所有項目的解除操作。 – spender

+0

但zip從兩個序列中獲取下一個元素。首先它從第一個序列中獲得「真實」,從第二個(沒有延遲)中獲得第一個值。然後它從第二個序列中獲得「假」和第二個值,但現在有一個延遲。不知道爲什麼它應該立即對所有物品進行整理也許你可以用trueThenFalse和Zip發佈代碼,你說失敗了? – Evk

回答

3

所以事實證明,Zip有another overload可以將一個IObservable序列與IEnumerable序列一起壓縮。

通過將IObservable的推送語義與IEnumerable的拉語義相結合,可以使我的測試用例工作。

所以,用下面的方法:

private IEnumerable<T> Inf<T>(T item) 
{ 
    for (;;) 
    { 
     yield return item; 
    } 
} 

我們可以做出一個IEnumerable:

var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false)); 

,然後用觀察到的源住口:

var src = Observable.Interval(TimeSpan.FromSeconds(1)); 
src.Zip(trueThenFalse, (i, tf) => tf).ForEach(x => Trace.TraceInformation("{0}", x)); 

...一切都按預期工作。

我現在有以下實現我的RateLimiter方法:

public static IObservable<T> RateLimit<T>(this IObservable<T> source, 
              TimeSpan minDelay) 
{ 
    var trueThenFalse = Enumerable.Repeat(true, 1).Concat(Inf(false)); 
    return 
     source.TimeInterval() 
      .Zip(trueThenFalse, (item, firstTime) => Observable.Return(item.Value) 
       .Delay(firstTime 
        ? TimeSpan.Zero 
        : TimeSpan.FromTicks(
         Math.Max(minDelay.Ticks - item.Interval.Ticks, 0)))) 

      .Concat(); 
}