2011-11-09 134 views
5

如果在最近5秒內收到UDP消息,並且如果發生超時,則返回false,我試圖創建一個IObservable<bool>返回true反應性擴展超時不停止序列?

到目前爲止,我有這樣的:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    var udp = BaseComms.UDPBaseStringListener(localEP) 
     .Where(msg => msg.Data.Contains("running")) 
     .Select(s => true); 

    return Observable 
     .Timeout(udp, TimeSpan.FromSeconds(5)) 
     .Catch(Observable.Return(false)); 
} 

的這個問題是: -

  • 一旦返回false,序列停止
  • 我只是真的需要truefalse狀態變化。

我可以使用Subject<T>但我需要小心處理UDPBaseStringListener觀察,當沒有更多的用戶。

更新

每次我得到一個UDP消息,我想它返回一個true。如果我在最近5秒內沒有收到UDP消息,我希望它返回false

+0

FYI,'Timeout'具有重載採用可觀察到的一個替代,用於當時間發生超時而不是「投擲」並需要「捕捉」。 –

+0

讀者可能也有興趣[1](http://stackoverflow.com/q/23394441/1267663),[2](http://stackoverflow.com/q/12786901/1267663)和[3]( http://stackoverflow.com/q/35873244/1267663)。 – Whymarrh

回答

3

正如Bj0指出的那樣,BufferWithTime的解決方案一收到數據點就不會返回數據點,並且在接收到數據點後緩衝區超時未被重置。

其中R x擴展2.0,您可以解決用新緩衝液兩個問題接受過載都超時和大小:

static IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    return BaseComms 
     .UDPBaseStringListener(localEP) 
     .Where(msg => msg.Data.Contains("running")) 
     .Buffer(TimeSpan.FromSeconds(5), 1) 
     .Select(s => s.Count > 0) 
     .DistinctUntilChanged(); 
} 
1

我建議避免使用Timeout - 它會導致異常並且編寫異常是不好的。

此外,它似乎只是有道理的,你的observable停止後一個值。你可能需要更多地解釋你想要的行爲。

我目前的解決問題的方法是:

public IObservable<Boolean> GettingUDPMessages(IPEndPoint localEP) 
{ 
    return Observable.Create<bool>(o => 
    { 
     var subject = new AsyncSubject<bool>(); 
     return new CompositeDisposable(
      Observable.Amb(
       BaseComms 
        .UDPBaseStringListener(localEP) 
        .Where(msg => msg.Data.Contains("running")) 
        .Select(s => true), 
       Observable 
        .Timer(TimeSpan.FromMilliseconds(10.0)) 
        .Select(_ => false) 
      ).Take(1).Subscribe(subject), subject.Subscribe(o)); 
    }); 
} 

這是否幫助?

+0

這不會編譯。 Observable.Create預計將返回一個Action ... – Tim

+0

'Timeout'只會在默認情況下導致異常。它具有一個重載,如果達到超時,則使用另一個可觀察值,默認的重載將使用'Observable.Throw'作爲備用。 –

+0

@Jim - 您必須使用不同版本的Rx - 它在v1.1.10621中編譯得很好。 – Enigmativity

1

如果你不希望序列停止,只是把它包在推遲+重複:

Observable.Defer(() => GettingUDPMessages(endpoint) 
    .Repeat(); 
2

與緩衝的問題是,當你得到一個在「超時」的時間間隔沒有得到復位新值,緩衝窗口只是時間片(在這種情況下是5秒),它們相互跟隨。這意味着,根據您何時收到最後一個值,您可能需要等待幾乎兩倍的超時值。這也可以錯過超時:

   should timeout here 
         v 
0s   5s   10s  15s 
|x - x - x | x - - - - | - - - x -| ... 
      true  true  true 

IObservable.Throttle,然而,每一個新的價值在於,只有產生時間跨度過後的值(最後一個進來的值)的時間復原。這可以用於爲超時,並與的IObservable合併以插入「超時」值到流:

var obs = BaseComms.UDPBaseStringListener(localEP) 
      .Where(msg => msg.Data.Contains("running")); 

return obs.Merge(obs.Throttle(TimeSpan.FromSeconds(5)) 
         .Select(x => false)) 
      .DistinctUntilChanged(); 

一個工作LINQPad例如:

var sub = new Subject<int>(); 

var script = sub.Timestamp() 
    .Merge(sub.Throttle(TimeSpan.FromSeconds(2)).Select(i => -1).Timestamp()) 
    .Subscribe(x => 
{ 
    x.Dump("val"); 
}); 


Thread.Sleep(1000); 

sub.OnNext(1); 
sub.OnNext(2); 

Thread.Sleep(10000); 

sub.OnNext(5); 

A -1被後插入到流中2秒超時。