使用Observable.Buffer(TimeSpan timeSpan) Method
,其將流進10分鐘塊及申報IList
,工作正常的Rx - 開始緩衝onNext
var stream = Observable.FromEventPattern<*>(*,*);
stream.Buffer(TimeSpan.FromSeconds(10));
試圖實現更復雜的行爲
- 開始區塊(緩衝的事件列表)當新事件被推入流(而不是每10秒)
- 繼續,直到任何情況下被推到流爲
x
秒
使用Observable.Buffer(TimeSpan timeSpan) Method
,其將流進10分鐘塊及申報IList
,工作正常的Rx - 開始緩衝onNext
var stream = Observable.FromEventPattern<*>(*,*);
stream.Buffer(TimeSpan.FromSeconds(10));
試圖實現更復雜的行爲
x
秒緩衝事件試試這個:
var query =
stream.Publish(
ps => ps.Window(
() => ps.Delay(TimeSpan.FromSeconds(1.0)).Take(1)));
我相信有相當你能做到這一點的幾種方式。
我這裏有一個工作測試例子
void Main()
{
var scheduler = new TestScheduler();
var stream = scheduler.CreateColdObservable(
ReactiveTest.OnNext(1.Seconds(), 'A'),
ReactiveTest.OnNext(2.Seconds(), 'B'),
ReactiveTest.OnNext(13.Seconds(), 'C')
);
var observer = scheduler.CreateObserver<string>();
var query = stream.Publish(s => {
return s.Timeout(TimeSpan.FromSeconds(10), Observable.Empty<char>(), scheduler)
.ToList()
.Where(buffer=>buffer.Any())
//Project to string to make equality test easier for the example.
.Select(buffer=>string.Join(",", buffer))
.Repeat();
});
query.Subscribe(observer);
scheduler.AdvanceBy(100.Seconds());
ReactiveAssert.AreElementsEqual(
new []{
ReactiveTest.OnNext(12.Seconds(), "A,B"),
ReactiveTest.OnNext(23.Seconds(), "C")
},
observer.Messages);
}
// Define other methods and classes here
public static class TimeEx
{
public static long Seconds(this int seconds)
{
return TimeSpan.FromSeconds(seconds).Ticks;
}
}
注意在這裏,我只是做了緩衝列表的字符串,使其更容易驗證的平等。即"A,B"
代替{'A', 'B'}
其他可供考慮的方案是Window
或GroupJoin
運營商能夠做到這一點 - 看到http://www.introtorx.com/content/v1.0.10621.0/17_SequencesOfCoincidence.html。我確信其他運營商可以像Switch
,Select
,Timer
,Timeout
等縫合在一起,以獲得您的結果。
不確定這裏有什麼問題 – Mathieu