2011-05-28 58 views
1

我正在寫一個函數IsAlive採取一種IObservable<T>和時間跨度的IsAlive,並返回一個IObservable<bool>的規範的使用情況是檢測一個流媒體服務器仍在發送數據。檢測上的IObservable

我已經爲它提出了以下解決方案,但覺得它不是最清楚它如何工作。

public static IObservable<bool> IsAlive<T>(this IObservable<T> source, 
              TimeSpan timeout, 
              IScheduler sched) 
{ 
    return source.Window(timeout, sched) 
       .Select(wind => wind.Any()) 
       .SelectMany(a => a) 
       .DistinctUntilChanged(); 
} 

有沒有人有更好的方法?

僅供參考 - 下面是我試過的單元測試和現有的方法:https://gist.github.com/997003

回答

1

這應該工作:

public static IObservable<bool> IsAlive<T>(this IObservable<T> source, 
              TimeSpan timeout, 
              IScheduler sched) 
{ 
    return source.Buffer(timeout, 1, sched) 
       .Select(l => l.Any()) 
       .DistinctUntilChanged(); 
} 

這種方法使語義也是如此。每次有物品進入時,它會填充緩衝區,然後傳遞true。每超時,一個空的緩衝區將被創建,並且false將被傳遞。

編輯:

這就是爲什麼緩衝-1方法比窗好:

var sched = new TestScheduler(); 
var subj = new Subject<Unit>(); 

var timeout = TimeSpan.FromTicks(10); 

subj 
    .Buffer(timeout, 1, sched) 
    .Select(Enumerable.Any) 
    .Subscribe(x => Console.WriteLine("Buffer(timeout, 1): " + x)); 

subj 
    .Window(timeout, sched) 
    .Select(wind => wind.Any()) 
    .SelectMany(a => a) 
    .Subscribe(x => Console.WriteLine("Window(timeout): "+x)); 

sched.AdvanceTo(5); 
subj.OnNext(Unit.Default); 
sched.AdvanceTo(16); 

產量:

Buffer(timeout, 1): True 
Window(timeout): True 
Buffer(timeout, 1): False 

具體而言,該窗口打開整個超時並且在物品進入時不會關閉和重置。這是1的緩衝區限制發揮作用的地方。只要有物品進入,緩衝區及其計時器就會重新啓動。

我可以重新實現我的緩衝區作爲窗口,因爲緩衝區的實現窗口,但a)我認爲緩衝區使語義更好,b)我不必SelectMany。 Scott的Select和SelectMany可以合併爲一個SelectMany(x => x.Any()),但是我可以避開整個lambda並指定Enumerable.Any方法組,它無論如何都會綁定得更快(無關緊要)。

+1

這與OP基本相同,除了使用'Buffer'而不是'Window'(前者使用後者來實現) – 2011-05-31 13:51:12

1

如何:

source.Select(_ => true) 
    .Timeout(timeout, sched) 
    .DistinctUntilChanged() 
    .Catch<bool, TimeoutException>)(ex => Observable.Return(false)); 
+0

在發生超時時間後開始的心跳信號不起作用。 – 2011-05-28 21:00:10

+0

如果你將錯誤'Concat'回到'Return',你可以使用'Retry'進行跟蹤。那麼我可能會把所有東西都包裝在一個'Publish(x => {...})'中來支持冷源。 – 2011-05-28 21:13:04

+0

我也會將'DistinctUntilChanged'移到'Publish'或至少'Catch'之後,否則你可能會得到錯誤的重複。 – 2011-05-28 21:19:20