2013-03-06 63 views
7

我爲反應式擴展創建了SlidingWindow()運算符,因爲我想輕鬆地監視諸如滾動平均值等事情。作爲一個簡單示例,我想訂閱聽到鼠標事件,但每次有一個事件我想要接收最後三個(而不是等待每三個事件接收最後三個事件)。這就是爲什麼我發現的Window重載似乎並沒有給我開箱即用的需求。無法在Rx中實現滑動窗口

這就是我想出來的。我擔心它可能不是最高效的解決方案,因爲它頻繁列表操作:

public static IObservable<List<T>> SlidingWindow<T>(this IObservable<T> seq, int length) 
{ 
    var seed = new List<T>(); 

    Func<List<T>, T, List<T>> accumulator = (list, arg2) => 
    { 
     list.Add(arg2); 

     if (list.Count > length) 
      list.RemoveRange(0, (list.Count - length)); 

     return list; 
    }; 

    return seq.Scan(seed, accumulator) 
       .Where(list => list.Count == length); 
} 

可以這樣調用:

var rollingSequence = Observable.Range(1, 5).SlidingWindow().ToEnumerable(); 

但是,我很驚訝,而不是接收的預期結果

1,2,3 
2,3,4 
3,4,5 

我收到了效果

2,3,4 
3,4,5 
3,4,5 

任何見解將不勝感激!

回答

5

試試這個 - 我不得不坐在那裏想一想它的相對錶現,但它是 至少 可能爲好,這樣更容易閱讀:

public static IObservable<IList<T>> SlidingWindow<T>(
     this IObservable<T> src, 
     int windowSize) 
{ 
    var feed = src.Publish().RefCount();  
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list 
    return Observable.Zip(
     Enumerable.Range(0, windowSize) 
      .Select(skip => feed.Skip(skip)) 
      .ToArray()); 
} 

試驗檯:

var source = Observable.Range(0, 10); 
var query = source.SlidingWindow(3); 
using(query.Subscribe(Console.WriteLine)) 
{    
    Console.ReadLine(); 
} 

輸出:

ListOf(0,1,2) 
ListOf(1,2,3) 
ListOf(2,3,4) 
ListOf(3,4,5) 
ListOf(4,5,6) 
... 

編輯:順便說一下,我發現自己一直以來都被強迫性地燒燬.Publish().RefCount() ......我不認爲這是嚴格要求。

編輯yzorg:

如果增加像這樣的方法,你會看到運行時行爲更清楚:

public static IObservable<IList<T>> SlidingWindow<T>(
    this IObservable<T> src, 
    int windowSize) 
{ 
    var feed = src.Publish().RefCount();  
    // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list 
    return Observable.Zip(
    Enumerable.Range(0, windowSize) 
     .Select(skip => 
     { 
      Console.WriteLine("Skipping {0} els", skip); 
      return feed.Skip(skip); 
     }) 
     .ToArray()); 
} 
+0

@blaster沒問題 - 事實上,感謝「讓」我寫出來,因爲自從回答這個問題以來,我自己使用過幾次。 ;) – JerKimball 2013-03-14 22:07:51

+0

我不認爲這是好事。 .Publish(),.Range(0,x)和.Skip() - 當它們結合在一起時,看起來性能很差,特別是O n^2,因爲Skip會一遍又一遍地迭代整個流。例如,您需要迭代30,000個整數才能獲得(10000,10001,10002)。所以你實際上並沒有在內存中保留源流的滑動緩衝區,你必須將整個源流(從時間的開始)保存在內存中,這是我認爲我們正在避免的。 – yzorg 2013-05-17 06:22:43

+0

@yzorg檢查編輯 – JerKimball 2013-05-17 14:37:43

9

使用原始的測試,與3計數的說法,這給了預期的效果:

public static IObservable<IList<T>> SlidingWindow<T>(
    this IObservable<T> source, int count) 
{ 
    return source.Buffer(count, 1) 
       .Where(list => list.Count == count); 
} 

測試是這樣的:

var source = Observable.Range(1, 5); 
var query = source.SlidingWindow(3); 
using (query.Subscribe(i => Console.WriteLine(string.Join(",", i)))) 
{ 

} 

輸出:

1,2,3 
2,3,4 
3,4,5 
6

只是source.Window(count, 1) - 或source.Buffer(count, 1) 這是 「數」 項的窗口/緩衝,由一個滑動。