2
我想在Rx.NET中實現一個信號過濾器,它以一組初始係數開始。隨着時間的推移,必須從觀測數據值的快照重新計算濾波器係數。使用Rx.NET的信號過濾器
這是一個小的原型,它顯示了它應該如何工作。爲了簡單起見,我選擇濾波器長度和用於重新計算濾波器係數的歷史數據值的數量(在本例中爲3)。
該示例使用bufferedAt10
中的副作用重新計算係數。這不是我想要的。
在實際應用中,數據以不規則的時間步進行,係數應該在特定時間每天更新一次或更新一次。我可以很容易地使緩衝區更長,但是我怎樣讓系統運行並以清晰的功能方式從觀察者處改變濾波器係數?
// create a hot obvervable to produce data
const int bufLen = 3;
var rng = new Random();
var period = TimeSpan.FromSeconds(0.5);
var observable = Observable.Interval(period)
.Select(i => new {Time = DateTime.Now, Price = rng.NextDouble()})
.Do(e => Console.WriteLine("original : {0}", e))
.Publish();
observable.Connect();
Console.WriteLine("Press any key to subscribe");
Console.ReadKey();
// buffer of length bufLen used for filter calculation (every tick) and filter
// coefficient update (at a lower frequency)
var buffered = observable.Buffer(bufLen, 1);
// apply the signal filter with coefficients in `coeff`
var coeff = new List<Double>() {1.0, 1.0, 1.0}; // these will be updated on the way from new data
var filtered = buffered.Select(e =>
{
var f = 0.0;
for (var i = 0; i < bufLen; i++)
{
f += e[i].Price*coeff[i]; // apply the filter with coefficients `coeff`
}
return new {Time = DateTime.Now, FilteredPrice = f};
});
var s1 = filtered.Subscribe(e => Console.WriteLine("filtered : {0} (coeff {1},{2},{3})", e, coeff[0], coeff[1], coeff[2]));
// recalculate the filter coefficients say every 10 seconds
var bufferedAt10 = buffered.DistinctUntilChanged(e => (e[bufLen - 1].Time.TimeOfDay.Seconds/10) * 10);
var s2 = bufferedAt10.Subscribe(e =>
{
Console.WriteLine("recalc of coeff : {0}", e[bufLen - 1].Time);
for (var i = 0; i < bufLen; i++)
{
// a prototypical function that takes the buffer and uses it to "recalibrate" the filter coefficients
coeff[i] = coeff[i] + e[bufLen - 1 - i].Price;
}
Console.WriteLine("updated coeffs to {0},{1},{2}", coeff[0], coeff[1], coeff[2]);
});
感謝您的任何好建議。
非常感謝,我明白了。我想'WithLatestFrom'應該是'CombineLatest'。 – Daniel
@Daniel'CombineLatest'將觸發源* Observable * *。 'WithLatestFrom'只有在第一個源發出時纔會觸發。它只能在預發佈版本2.3.0-beta中提供 – paulpdaniels