2010-09-16 44 views
3

肯定有人在那裏解決這個問題。 想象一下,我有一個班級定期提出一個關於值變化的事件(例如PropertyChanged) 這個價值無非就是金錢。與Rx運行總和

現在,我想利用Rx,以便獲得最近10分鐘增加的總和。例如BufferWithTime沒有幫助,因爲我總是需要最後10分鐘。

任何想法,我可以做到這一點?下面

TIA 馬丁

回答

3

的解決方案涉及保持相關事件數據的狀態,在過去的10分鐘內使用Observable.Scan列表。 State被維護爲一個元組列表,其值爲int(貨幣)和DateTime

var events = Observable.FromEvent<YourEventArgs>(
    h => SomeEvent += h, h => SomeEvent -= h); 
var runningSums = 
    events.Scan(new List<Tuple<int, DateTime>>(), 
       (l, e) => 
       { 
        var now = DateTime.Now; 
        // Add last event data to list. 
        l.Add(Tuple.Create(e.EventArgs.Money, now)); 
        // Return the correct part of the list (everything 
        // from the last ten minutes). 
        return l.Where(t => (now - t.Item2) < 
            TimeSpan.FromMinutes(10)).ToList(); 
       }) 
      .Select(l => l.Sum(t => t.Item1)); 
runningSums.Subscribe(sum => Console.WriteLine(sum)); 

編輯:例子並不爲每個事件返回一個新的列表:

var events = Observable.FromEvent<YourEventArgs>(
    h => SomeEvent += h, h => SomeEvent -= h); 
var runningSums = 
    events.Scan(Tuple.Create(new List<Tuple<int, DateTime>>(), 
          DateTime.Now - TimeSpan.FromMinutes(10)), 
       (l, e) => 
       { 
        var now = DateTime.Now; 
        l.Item1.Add(Tuple.Create(e.EventArgs.Nr, now)); 
        // if (trimming-condition) then trim front of list... 
        return Tuple.Create(l.Item1, now - TimeSpan.FromMinutes(10)); 
       }) 
      .Select(l => l.Item1.Where(t => t.Item2 > l.Item2).Sum(t => t.Item1)); 
runningSums.Subscribe(sum => Console.WriteLine(sum)); 
+0

不壞的肯定和做什麼它應該。但是你可能想要調查是否用每個新元素處理返回一個新列表是一個好主意。 – pointernil 2010-09-16 18:04:55

+1

我也不完全滿意。但是,您必須擁有包含時間戳的值列表才能執行此操作。儘管可變名單,這是一個完全有效的方法。你也可以做的不是每次創建一個新的列表,而是保持相同的列表,並使「Scan」方法的開始時間(過去總是10分鐘)成爲狀態的一部分。然而,現在名單隻有不斷增長,所以你必須找到一種方法來修剪前線。我也爲此添加了一個例子(沒有修剪,但可以很容易地添加)。 – 2010-09-16 18:32:02

1

好了,看看下面的解決方案。它建立在先前提出的解決方案的基礎上,但爲了效率(和可讀性,我認爲),卻放棄了純粹的功能風格。它重用藏漢內置的Timestamped類型跟蹤時間...

歡呼

public static class RxEntentsions 
     { 
      class TimeLimitedList<T> 
      { 
       public List<Timestamped<T>> Values = new List<Timestamped<T>>(); 
       TimeSpan span; 
       public TimeLimitedList(TimeSpan sp) { span = sp; } 
       public void Add(Timestamped<T> v) 
       { 
        Values.Add(v); 
        Values.RemoveAll(a => a.Timestamp < (DateTime.Now - span)); 
       } 
      } 

      public static IObservable<List<Timestamped<TSource>>> SlidingWindow<TSource>(this IObservable<Timestamped<TSource>> source, TimeSpan slidingWindow) 
      { 
       return source.Scan0(new TimeLimitedList<TSource>(slidingWindow), (acc, v) => { acc.Add(v); return acc; }).Select(a => a.Values); 
      } 
     } 


    static void Main(string[] args) 
    { 
     var gen = Observable.Interval(TimeSpan.FromSeconds(0.25d)).Timestamp(); 
     gen.SlidingWindow(TimeSpan.FromSeconds(1)).Subscribe(slw => {slw.ForEach(e=> Console.WriteLine(e)); Console.WriteLine("--------");}); 
     Console.ReadLine(); 
    }