2017-08-25 39 views
1

我有一個遠程程序,它通過套接字連接每10毫秒發送一次更新的度量。在我的客戶端程序中,我將這個套接字包裝在一個可產生這些測量結果的觀察值中對於我的用例來說,測量到達10毫秒間隔很重要。當然,這種情況不會發生,因爲網絡延遲會使信息早或晚地到達每條信息。無效編程規範化時間序列值

所以基本上我在遠程PC上有一個程序在套接字連接上發送它。

--爲10毫秒

o--o--o--o--o--o--o--o--o--o--... 

這成爲該我的客戶端上,由於網絡延遲。

o-o---o-o--o---o--o-o--o-o-... 

現在我可觀察到我想「正常化」這個,所以它會每10毫秒再次發射一個值。

--o--o--o--o--o--o--o--o--o--o... 

當然這意味着我將不得不引入一個緩衝時間,它將存儲值並在10毫秒間隔內發射它們。有什麼辦法可以完成這個嗎?

下面是一些測試代碼,它將根據我上面描述的方式發出事件。

using System; 
using System.Collections.Generic; 
using System.Reactive.Disposables; 
using System.Reactive.Linq; 
using System.Threading.Tasks; 
using Microsoft.Reactive.Testing; 

public class Program 
{ 
    protected static event EventHandler<EventArgs> CancelEvent; 

    private static Random random = new Random(); 

    private static double GetRandomNumber(double minimum, double maximum) 
    { 
     return random.NextDouble() * (maximum - minimum) + minimum; 
    } 

    public static void Main() 
    { 
     var completed = false; 

     var scheduler = new TestScheduler(); 

     var observable = Observable 
      .Interval(TimeSpan.FromMilliseconds(7.0), scheduler) 
      .SelectMany(e => Observable 
       .Return(e, scheduler) 
       .Delay(TimeSpan.FromMilliseconds(GetRandomNumber(0.0, 6.0)), scheduler) 
      ) 
      .TimeInterval(scheduler) 
      .Select(t => t.Interval.Milliseconds); 

     var fromEvent = Observable.FromEventPattern<EventArgs>(
      p => CancelEvent += p, 
      p => CancelEvent -= p, 
      scheduler 
     ); 

     var cancellable = observable.TakeUntil(fromEvent); 

     var results = new List<int>(); 

     using (cancellable.Subscribe(
      results.Add, 
      e => { throw new Exception("No exception is planned! {0}", e); }, 
      () => { completed = true; }) 
     ) 
     { 
      scheduler.AdvanceBy(TimeSpan.FromSeconds(3.5).Ticks); 
      CancelEvent(null, new EventArgs()); 
      scheduler.AdvanceBy(TimeSpan.FromSeconds(3).Ticks); 
     } 

     Console.WriteLine("Have I completed indeed? {0}", completed); 
     Console.WriteLine("What emit time deltas been registered before cancellation?\n\t{0}", string.Join("ms\n\t", results)); 
    } 
} 
+0

您是否可以簡單地用'.Zip'將它們與。間隔(TimeSpan.FromMilliseconds(10.0))'? – Enigmativity

+0

另外,請記住,Windows不是**實時操作系統**,無論您做什麼,都無法將計時降至10毫秒。 – Enigmativity

+0

僅當間隔始終比源更慢時才能使用Zip。如果有很長時間的延遲,接下來是爆發,那麼他們都會同時出現。 – Shlomo

回答

2

這在理論上與A way to push buffered events in even intervals相似。

這種解決辦法是這樣的:

var source = new Subject<double>(); 
var bufferTime = TimeSpan.FromMilliseconds(100); 
var normalizedSource = source 
    .Delay(bufferTime) 
    .Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromMilliseconds(10))); 

...與Drain定義如下:

public static class ObservableDrainExtensions 
{ 
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
     Func<TSource, IObservable<TOut>> selector) 
    { 
     return Observable.Defer(() => 
     { 
      BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); 

      return source 
       .Zip(queue, (v, q) => v) 
       .SelectMany(v => selector(v) 
        .Do(_ => { },() => queue.OnNext(new Unit())) 
       ); 
     }); 
    } 
} 

不過,我想你會碰到與10毫秒的問題預選賽。這個時間太短。如果我沒有記錯的話,任何小於15ms的延遲都會被調度程序忽略並立即觸發。鑑於此,即使您使用了較大的時間間隔(我嘗試了100 ms),由於操作系統上下文切換等原因,您將得到一些差異。