這應該工作:
public static IObservable<bool> IsAlive<T>(this IObservable<T> source,
TimeSpan timeout,
IScheduler sched)
{
return source.Buffer(timeout, 1, sched)
.Select(l => l.Any())
.DistinctUntilChanged();
}
這種方法使語義也是如此。每次有物品進入時,它會填充緩衝區,然後傳遞true。每超時,一個空的緩衝區將被創建,並且false將被傳遞。
編輯:
這就是爲什麼緩衝-1方法比窗好:
var sched = new TestScheduler();
var subj = new Subject<Unit>();
var timeout = TimeSpan.FromTicks(10);
subj
.Buffer(timeout, 1, sched)
.Select(Enumerable.Any)
.Subscribe(x => Console.WriteLine("Buffer(timeout, 1): " + x));
subj
.Window(timeout, sched)
.Select(wind => wind.Any())
.SelectMany(a => a)
.Subscribe(x => Console.WriteLine("Window(timeout): "+x));
sched.AdvanceTo(5);
subj.OnNext(Unit.Default);
sched.AdvanceTo(16);
產量:
Buffer(timeout, 1): True
Window(timeout): True
Buffer(timeout, 1): False
具體而言,該窗口打開整個超時並且在物品進入時不會關閉和重置。這是1的緩衝區限制發揮作用的地方。只要有物品進入,緩衝區及其計時器就會重新啓動。
我可以重新實現我的緩衝區作爲窗口,因爲緩衝區的實現是窗口,但a)我認爲緩衝區使語義更好,b)我不必SelectMany。 Scott的Select和SelectMany可以合併爲一個SelectMany(x => x.Any()),但是我可以避開整個lambda並指定Enumerable.Any方法組,它無論如何都會綁定得更快(無關緊要)。
這與OP基本相同,除了使用'Buffer'而不是'Window'(前者使用後者來實現) – 2011-05-31 13:51:12