2012-07-19 67 views
4

我有一個股票報價序列進來,我想在最後一小時採取所有的數據,並對其做一些處理。我試圖用反應式擴展2.0實現這一點。我在另一篇文章上閱讀使用Interval,但我認爲這已被棄用。無功擴展滑動時間窗口

+2

你想有價值的最後一個小時,每次有新價值時,或者你只是想每小時價值一小時的股票價格? – Enigmativity 2012-07-20 01:32:24

+0

我希望在每次有新值出現時每次值的最後一小時。我已經看過Buffer,但我認爲它不是正確的。 – 2012-07-25 08:02:21

回答

8

這會擴展方法解決問題了嗎?

public static IObservable<T[]> RollingBuffer<T>(
    this IObservable<T> @this, 
    TimeSpan buffering) 
{ 
    return Observable.Create<T[]>(o => 
    { 
     var list = new LinkedList<Timestamped<T>>(); 
     return @this.Timestamp().Subscribe(tx => 
     { 
      list.AddLast(tx); 
      while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering)) 
      { 
       list.RemoveFirst(); 
      } 
      o.OnNext(list.Select(tx2 => tx2.Value).ToArray()); 
     }, ex => o.OnError(ex),() => o.OnCompleted()); 
    }); 
} 
1

很可能Buffer是你在找什麼:

var hourlyBatch = ticks.Buffer(TimeSpan.FromHours(1)); 
+1

我認爲這會產生1小時數據的非重疊窗口,而我基本上每次有新值進入時都需要一個滑動窗口。 – 2012-07-25 08:04:00

+1

啊......這就是ticks.Window(TimeSpan.FromHouse(1))。 – 2012-07-25 15:10:35

+0

這將給1小時窗口,但不會重疊窗口。您需要提供窗口打開和關閉觀察值,或跳過計數/次。 – 2015-07-07 06:24:34

3

您正在尋找窗口操作員! 這是一個漫長的文章中,我與巧合的序列工作(重疊序列之窗) http://introtorx.com/Content/v1.0.10621.0/17_SequencesOfCoincidence.html

所以寫了,如果你想建立你可以使用這種代碼的滾動平均值

var scheduler = new TestScheduler(); 
var notifications = new Recorded<Notification<double>>[30]; 
for (int i = 0; i < notifications.Length; i++) 
{ 
    notifications[i] = new Recorded<Notification<double>>(i*1000000, Notification.CreateOnNext<double>(i)); 
} 
//Push values into an observable sequence 0.1 seconds apart with values from 0 to 30 
var source = scheduler.CreateHotObservable(notifications); 

source.GroupJoin(
     source, //Take values from myself 
     _=>Observable.Return(0, scheduler), //Just the first value 
     _=>Observable.Timer(TimeSpan.FromSeconds(1), scheduler),//Window period, change to 1hour 
     (lhs, rhs)=>rhs.Sum()) //Aggregation you want to do. 
    .Subscribe(i=>Console.WriteLine (i)); 
scheduler.Start(); 

而且我們可以看到它在輸入值時輸出滾動條。

0,1,3,6,10,15,21,28 ...

+0

我無法獲得此代碼的工作。 '''rhs.Sum()'''生成'''IObservable ''''不是''double''',所以我假設'''.Concat()'''是否也需要?但即使只有零輸出 – 2017-09-07 13:51:35

+0

道歉,我糾正了答案。當使用'TestScheduler'確保準確時,Observable.Return(0)需要傳入的調度器,即Observable.Return(0,scheduler)。也許更有意義的值是Observable.Return(Unit.Default,scheduler)'或'Observable.Empty (scheduler)' – 2017-09-07 23:08:10

0

或者假設數據已經Timestamp版,只需使用Scan

public static IObservable<IReadOnlyList<Timestamped<T>>> SlidingWindow<T>(this IObservable<Timestamped<T>> self, TimeSpan length) 
    { 
     return self.Scan(new LinkedList<Timestamped<T>>(), 
         (ll, newSample) => 
         { 
          ll.AddLast(newSample); 
          var oldest = newSample.Timestamp - length; 
          while (ll.Count > 0 && list.First.Value.Timestamp < oldest) 
           list.RemoveFirst(); 

          return list; 
         }).Select(l => l.ToList().AsReadOnly()); 
    }