2013-03-20 53 views
0

我需要反彈輸入流。Rx反彈輸入

在狀態1的第一次出現時,我需要等待5秒鐘並確認laste狀態是否也爲1. 只有比我有穩定的信號。

(time) 0-1-2-3-4-5-6-7-8-9 
(state) 0-0-0-0-0-1-0-1-0-1 
(result)     -> 1 

這裏是一個非穩定信號的例子。

(time) 0-1-2-3-4-5-6-7-8-9 
(state) 0-0-0-0-0-1-0-1-0-0 
(result)     -> 0 

我試圖用一個緩衝區,但緩衝區有固定的出發點,我需要等待開始我的第一個事件5秒。

+0

如果示例2中的下一個狀態爲0-1,您會希望結果如何? – 2013-03-20 23:41:41

+0

每秒產生一個輸出嗎?我同意下面的JerKilmball,一些關於你的用例的更多信息會有幫助 – AlexFoxGill 2013-03-21 08:57:21

回答

3

以你的要求的字面

在狀態1的第一次出現,我需要等待5秒鐘, 驗證是否樂特狀態也1.只比我有一份穩定的 信號。

我可以想出幾種方法來解決這個問題。 爲了澄清我的假設,您只需要推出第一次出現1次後5秒產生的最後一個值。這將導致產生0或1的單個值序列(即,不管過去產生的任何其他值距離源序列5秒)

這裏我用一些jiggery-pokery重新創建序列。

var source = Observable.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1)) 
    .Take(10) 
    .Select(i=>{if(i==5 || i==7 || i==9){return 1;}else{return 0;}}); //Should produce 1; 
    //.Select(i=>{if(i==5 || i==7){return 1;}else{return 0;}}); //Should produce 0; 

下面的所有選項看起來共享序列。要在Rx中安全地共享一個序列,我們發佈()並連接它。我通過RefCount()運算符使用自動連接。

var sharedSource = source.Publish().RefCount(); 

1)在此解決方案中,我們取第一個值1,然後將選定的序列值緩衝到5秒的緩衝區大小。我們只取這些緩衝區中的第一個。一旦我們得到這個緩衝區,我們得到最後一個值並推送它。如果緩衝區是空的,我假定我們推動一個,因爲最後一個值是'1',啓動緩衝區運行。

sharedSource.Where(state=>state==1) 
      .Take(1) 
      .SelectMany(_=>sharedSource.Buffer(TimeSpan.FromSeconds(5)).Take(1)) 
      .Select(buffer=> 
      { 
       if(buffer.Any()) 
       { 
        return buffer.Last(); 
       } 
       else{ 
        return 1; 
       } 
      }) 
      .Dump(); 

2)在這個解決方案,我採取的方法只有一次,我們得到一個有效的值(1),開始聽,然後直到定時器觸發終止採取一切值。從這裏我們拿出最後一個值。

var fromFirstValid = sharedSource.SkipWhile(state=>state==0); 
fromFirstValid 
    .TakeUntil(
     fromFirstValid.Take(1) 
        .SelectMany(_=>Observable.Timer(TimeSpan.FromSeconds(5)))) 
    .TakeLast(1) 
    .Dump(); 

3)在該溶液I使用窗口操作者創建時的「1」的第一個值發生打開一個窗口,然後關閉時5秒爲止。我們再次取最後一個值

sharedSource.Window(
       sharedSource.Where(state=>state==1), 
       _=>Observable.Timer(TimeSpan.FromSeconds(5))) 
      .SelectMany(window=>window.TakeLast(1)) 
      .Take(1) 
      .Dump(); 

所以很多不同的方法來皮膚貓。

+0

+1 - 很好的答案!你知道,我粘貼一個鏈接到你的文章窗口/加入/等約三次血腥的一週... :) – JerKimball 2013-03-22 18:05:21

+0

嗚呼!我非常激動,它爲你提供了價值。 – 2013-03-22 18:06:21

+0

太棒了!我使用瞭解決方案2.如果您每天都不使用Rx,那麼很難將您的大腦切換到Rx模式。謝謝! – 2013-04-08 18:43:14

1

這聽起來(一覽)像你想Throttle,不Buffer,雖然您的使用情況下,一些詳細信息,將有助於引腳下來 - 無論如何,這裏是你會如何Throttle您的直播

void Main() 
{ 
    var subject = new Subject<int>(); 
    var source = subject.Publish().RefCount(); 

    var query = source 
     // Start counting on a 1, wait 5 seconds, and take the last value 
     .Throttle(x => Observable.Timer(TimeSpan.FromSeconds(5))); 

    using(query.Subscribe(Console.WriteLine)) 
    { 
     // This sequence should produce a one 
     subject.OnNext(1); 
     subject.OnNext(0); 
     subject.OnNext(1); 
     subject.OnNext(0); 
     subject.OnNext(1); 
     subject.OnNext(1); 
     Console.ReadLine(); 
     // This sequence should produce a zero 
     subject.OnNext(0); 
     subject.OnNext(0); 
     subject.OnNext(0); 
     subject.OnNext(0); 
     subject.OnNext(1); 
     subject.OnNext(0); 
     Console.ReadLine(); 
    } 
}