2013-03-12 103 views
3

我在C#中使用Reactive Extensions(Rx)並希望以下面的方式過濾事件。想象一下,我有以下的鼻祖序列:Rx:如何緩衝事件(環形緩衝區)並且只在發生特殊事件時才刷新它們?

ABCDEF X GHI XĴXX KLMNO X P

我想產生以下的輸出:

EF X HI X J XX NO X

基本上,我會緩衝(油門?)事件與最大界限(在這個例子中這個界限是2),當我得到某個事件(在這種情況下事件X),我想刷新緩衝區輸出並重新開始緩衝,直到再次看到特殊事件。

我正在嘗試一些方法,但沒有任何運氣,我想應該有一個簡單的方法來完成它,我失蹤了。編輯:一個約束,是我期望得到TONS被拋棄的事件,並且只有X的幾個實例,所以在內存中保留一個具有數千個事件的緩衝區,只讀取最後2個(或20個)事件,並不是一個真正的選擇。

回答

0

這裏是一個刺來回答我的問題如果您發現任何問題,請告訴我。

public static class ObservableHelper 
{ 
    /// <summary> 
    /// Buffers entries that do no satisfy the <paramref name="shouldFlush"/> condition, using a circular buffer with a max 
    /// capacity. When an entry that satisfies the condition ocurrs, then it flushes the circular buffer and the new entry, 
    /// and starts buffering again. 
    /// </summary> 
    /// <typeparam name="T">The type of entry.</typeparam> 
    /// <param name="stream">The original stream of events.</param> 
    /// <param name="shouldFlush">The condition that defines whether the item and the buffered entries are flushed.</param> 
    /// <param name="bufferSize">The buffer size for accumulated entries.</param> 
    /// <returns>An observable that has this filtering capability.</returns> 
    public static IObservable<T> FlushOnTrigger<T>(this IObservable<T> stream, Func<T, bool> shouldFlush, int bufferSize) 
    { 
     if (stream == null) throw new ArgumentNullException("stream"); 
     if (shouldFlush == null) throw new ArgumentNullException("shouldFlush"); 
     if (bufferSize < 1) throw new ArgumentOutOfRangeException("bufferSize"); 

     return System.Reactive.Linq.Observable.Create<T>(observer => 
     { 
      var buffer = new CircularBuffer<T>(bufferSize); 
      var subscription = stream.Subscribe(
       newItem => 
        { 
         bool result; 
         try 
         { 
          result = shouldFlush(newItem); 
         } 
         catch (Exception ex) 
         { 
          return; 
         } 

         if (result) 
         { 
          foreach (var buffered in buffer.TakeAll()) 
          { 
           observer.OnNext(buffered); 
          } 

          observer.OnNext(newItem); 
         } 
         else 
         { 
          buffer.Add(newItem); 
         } 
        }, 
       observer.OnError, 
       observer.OnCompleted); 

      return subscription; 
     }); 
    } 
} 

順便說一下,CircularBuffer不是開箱即用,但實現很簡單。

然後我就叫:

 data 
      .FlushOnTrigger(item => item == 'X', bufferSize: 2) 
      .Subscribe(Console.WriteLine); 
+0

除了論證檢查之外,我只看到一個主要的疏漏。您不會傳遞OnError或OnCompleted通知。 (通過'observer.OnError'和'observer.OnCompleted'到'Subscribe'調用你應該在這裏工作。)我懷疑'Synchronize'調用總是需要的,並且通常會留給調用者來插入時需要。與問題的RX部分無關,但我期望'CircularBuffer'直接實現'IEnumerable',而不需要'ReadAll'方法。 – 2013-03-14 04:38:00

+1

這是什麼'Synchronize()'運算符? – AlexFoxGill 2013-03-14 09:17:55

+0

感謝您的反饋,我刪除了Synchronize調用。有一個TakeAll方法的原因是,我想清楚(並強制),當我讀取整個緩衝區時,它會自動清空(而不是枚舉項目,然後清除緩衝區) – 2013-08-23 20:49:20

1

爲方便起見,我們需要以下兩個擴展功能:

public static class Extensions 
{ 
    public static IObservable<IList<TSource>> BufferUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate) 
    { 
     var published = source.Publish().RefCount(); 
     return published.Buffer(() => published.Where(predicate)); 
    } 

    public static IEnumerable<TSource> TakeLast<TSource>(this IEnumerable<TSource> source, int count) 
    { 
     return source.Reverse().Take(count).Reverse(); 
    } 
} 

然後,我們解決問題,像這樣:

source.BufferUntil(c => c == 'X') 
    .SelectMany(list => list.TakeLast(3)) 

輸出:

E F X H I X J X X N O X 
+0

謝謝,它的工作,但在這裏使用一個緩衝意味着,直到你得到一個X,你將有可能緩衝數千個事件,只是爲了利用最新的。如果不需要的話,我想要一些甚至沒有提及過去事件的東西。 – 2013-03-13 21:31:38

1

我會捎帶上我在這裏發佈的另一個答案: Trouble Implementing a Sliding Window in Rx

最重要的一點是此擴展方法:

public static class Ext 
{ 
    public static IObservable<IList<T>> SlidingWindow<T>(
     this IObservable<T> src, 
     int windowSize) 
    { 
     var feed = src.Publish().RefCount();  
     // (skip 0) + (skip 1) + (skip 2) + ... + (skip nth) => return as list 
     return Observable.Zip(
     Enumerable.Range(0, windowSize) 
      .Select(skip => feed.Skip(skip)) 
      .ToArray()); 
    } 
} 

您可以正是如此使用:

void Main() 
{ 
    // A faked up source 
    var source = new Subject<char>(); 

    var bufferSize = 2; 
    Func<char, bool> eventTrigger = c => c == 'X'; 

    var query = source 
     .Publish() 
     .RefCount() 
     // Want one extra slot to detect the "event" 
     .SlidingWindow(bufferSize + 1) 
     .Where(window => eventTrigger(window.Last())) 
     .Select(buffer => buffer.ToObservable()) 
     .Switch(); 

    using(query.Subscribe(Console.WriteLine)) 
    { 
     source.OnNext('A'); 
     source.OnNext('B'); 
     source.OnNext('C'); 
     source.OnNext('D'); 
     source.OnNext('E'); 
     source.OnNext('F'); 
     source.OnNext('X'); 
     source.OnNext('G'); 
     source.OnNext('H'); 
     source.OnNext('I'); 
     source.OnNext('X'); 
     Console.ReadLine(); 
    }  
} 

輸出:

E 
F 
X 
H 
I 
X