2016-06-14 37 views
0

使用Observable.Buffer(TimeSpan timeSpan) Method,其將流進10分鐘塊及申報IList,工作正常的Rx - 開始緩衝onNext

var stream = Observable.FromEventPattern<*>(*,*); 
stream.Buffer(TimeSpan.FromSeconds(10)); 

試圖實現更復雜的行爲

  • 開始區塊(緩衝的事件列表)當新事件被推入流(而不是每10秒)
  • 繼續,直到任何情況下被推到流爲x
+0

不確定這裏有什麼問題 – Mathieu

回答

0

緩衝事件試試這個:

var query = 
    stream.Publish(
     ps => ps.Window(
      () => ps.Delay(TimeSpan.FromSeconds(1.0)).Take(1))); 
0

我相信有相當你能做到這一點的幾種方式。

我這裏有一個工作測試例子

void Main() 
{ 
    var scheduler = new TestScheduler(); 
    var stream = scheduler.CreateColdObservable(
     ReactiveTest.OnNext(1.Seconds(), 'A'), 
     ReactiveTest.OnNext(2.Seconds(), 'B'), 
     ReactiveTest.OnNext(13.Seconds(), 'C') 
     ); 

    var observer = scheduler.CreateObserver<string>(); 

    var query = stream.Publish(s => { 
     return s.Timeout(TimeSpan.FromSeconds(10), Observable.Empty<char>(), scheduler) 
      .ToList() 
      .Where(buffer=>buffer.Any())  
      //Project to string to make equality test easier for the example.  
      .Select(buffer=>string.Join(",", buffer)) 
      .Repeat(); 
    }); 

    query.Subscribe(observer); 

    scheduler.AdvanceBy(100.Seconds()); 

    ReactiveAssert.AreElementsEqual(
     new []{ 
      ReactiveTest.OnNext(12.Seconds(), "A,B"), 
      ReactiveTest.OnNext(23.Seconds(), "C") 
     }, 
     observer.Messages); 
} 

// Define other methods and classes here 
public static class TimeEx 
{ 
    public static long Seconds(this int seconds) 
    { 
     return TimeSpan.FromSeconds(seconds).Ticks; 
    } 
} 

注意在這裏,我只是做了緩衝列表的字符串,使其更容易驗證的平等。即"A,B"代替{'A', 'B'}

其他可供考慮的方案是WindowGroupJoin運營商能夠做到這一點 - 看到http://www.introtorx.com/content/v1.0.10621.0/17_SequencesOfCoincidence.html。我確信其他運營商可以像SwitchSelectTimer,Timeout等縫合在一起,以獲得您的結果。