2012-09-27 45 views
3

我正在使用連接到COM端口的Reactive Extensions Observable數據流,並且顯示在一定時間間隔內從該數據流獲取的緩衝區。如何在滿足特定值或條件時觸發緩衝Observable數據流?

這是我的基本Rx代碼,其中字節數據以25毫秒的塊返回。我想在第一次觸及特定閾值時觸發緩衝區的生成,然後只在收集完上一個緩衝區後纔再次執行。

var o = serialData.Buffer(TimeSpan.FromMilliseconds(25)) 
        .ObserveOn(SynchronizationContext.Current); 
var mySerialObserver = o.Subscribe<IList<byte>>(SubscribeAction()); 

serialData對象是來自USB COM端口的連續字節值流的IObservable。該代碼被改編自巴特迪斯梅特交:

How to implement SerialPort parser with Rx

使用Rx緩衝區(時間跨度)方法我可以品嚐serialData和在圖上顯示的緩衝器值(我的SubscribeAction方法內使用DynamicDataDisplay )。

我想將功能擴展爲像示波器觸發器一樣,這可能涉及在serialData值超過給定閾值但不收集重疊緩衝區時調用Rx緩衝區方法(這可能類似於示波器時基在特定輸入電壓下觸發,但在掃描完成之前不會再觸發)

請有人給我一些關於如何實現的想法嗎?

+0

我想你需要解釋一些關於你的問題/要求。 「閾」? 「集」? etc .. – Enigmativity

+0

我有一個IObservable流從一個COM端口 - 我從巴特代斯梅特代碼示例獲得的serialData –

+0

對不起,Enigmativity,我已經把更多的細節放在主帖子 –

回答

3

緩衝區只會釋放所有的值直到緩衝區關閉,這對於實時圖不是很有用。 您必須將值分成不重疊的窗口 - 從給定的觸發器開始,並在掃描條件完成時關閉 - 一個完整掃描週期的窗口。 不幸的是,窗口在啓動時仍會給我們值,所以我們將不得不跳過觸發器觸發前所有的值。

static IObservable<IObservable<T>> TriggeredSweep<T>(
     this IObservable<T> source, 
     Func<T, bool> triggerCondition, 
     Func<T, bool> sweepEnd 
     ) 
    { 
     source = source.Publish().RefCount(); 
     return source.Window(() => source.Where(triggerCondition).Sample(source.Where(sweepEnd))) 
        .Select(s => s.SkipWhile(v => !triggerCondition(v))); 

    } 

測試了這一點,最好的辦法是在其上這個的前提是非常示波器型號:

 double period = 1000/0.5; //0.5 Hz 
     int cycles = 4;    //cycles to display 
     int quantization = 100;  //cycles to display    
     int amplitude = 10;   //signal peak    

     int range = quantization * cycles; //full range 

     //Sine wave generator for n cycles 
     //makes tuple of (t, sin(t)) 
     var source = Observable.Interval(TimeSpan.FromMilliseconds(period/range)) 
           .Select(s => s % (range + 1)) 
           .Select(s => Tuple.Create(s, amplitude * Math.Sin((double)s/((double)range/(double)cycles) * 2 * Math.PI))); 


     source.TriggeredSweep(
      value => value.Item2 > 5, //Trigger when Signal value > 5 
      value => value.Item1/quantization >= cycles //end sweep when all cycles are done 
      ) 
       .Subscribe(window => 
       { 
        Console.Clear(); //Clear CRO Monitor 

        window.Subscribe(value => 
        { 
         //Set (x, y) 
         Console.CursorLeft = (int)((double)value.Item1/range * (Console.WindowWidth - 1)); 
         Console.CursorTop = (int)((amplitude - value.Item2)/(2 * amplitude) * (Console.WindowHeight - 1)); 

         //draw 
         Console.Write("x"); 
        }); 
       }); 

     //prevent close 
     Console.ReadLine(); 

輸出:

xxx     xxxx     xxx     xxxx 
    xx x     x x     xx x     x x 
    x x     xx x     x x     xx x 
    xx x    x x    xx x    x x 
    x  x    xx  x    x  x    xx  x 
    x  x    x  x    x  x    x  x 
    x  x    x  x    xx  x    x  x 
     x    x  x    x  x    x  x 
     x    x  x    x  x    x  x 
      x   x  x    x  x   x  x 
      x   x  xx   x   x   x  xx 
      x   x   x   x   x   x   x 
      x   x   x   x   x   x   x 
      x   x   x   x   x   x   x   x 
      x   x   x   x   x   x   x   x 
      x   x   x   x   x   x   x   x 
      xx  x   x   x   xx  x   x   x 
      x  x   x  x    x  x   x  x 
      x  x    x  x    x  x    x  x 
      x  x    x  x    x  x    x  x 
      x  x    x  xx    x  x    x  xx 
      x  x    x  x    x  x    x  x 
      x  xx    x  x    x  xx    x  x 
       x x    x xx    x x    x xx 
       x xx     x x     x xx     x x 
       x x     x xx     x x     x xx 
       xxxx     xxx     xxxx     xxx 
       x      x      x      x 

我希望這些代碼可能是有用的使用Rx測試簡單的信號處理功能。 :)

+0

感謝您的解決方案,它是帶我一段時間讓我的頭在Reactive Extensions附近,你的例子確實有幫助。我正在做一個簡單的假設,即在一個緩衝區中收集的所有數據在時間間隔內均勻分佈。 –

+0

@CliveG沒問題。順便說一句,如果你沒有問題,你可以把它標記爲答案。 – Asti

相關問題