2015-11-02 93 views





 // create a hot obvervable to produce data 
     const int bufLen = 3; 
     var rng = new Random(); 
     var period = TimeSpan.FromSeconds(0.5); 
     var observable = Observable.Interval(period) 
     .Select(i => new {Time = DateTime.Now, Price = rng.NextDouble()}) 
     .Do(e => Console.WriteLine("original : {0}", e)) 

     Console.WriteLine("Press any key to subscribe"); 

     // buffer of length bufLen used for filter calculation (every tick) and filter 
     // coefficient update (at a lower frequency) 
     var buffered = observable.Buffer(bufLen, 1); 

     // apply the signal filter with coefficients in `coeff` 
     var coeff = new List<Double>() {1.0, 1.0, 1.0}; // these will be updated on the way from new data 
     var filtered = buffered.Select(e => 
      var f = 0.0; 
      for (var i = 0; i < bufLen; i++) 
       f += e[i].Price*coeff[i]; // apply the filter with coefficients `coeff` 
      return new {Time = DateTime.Now, FilteredPrice = f}; 

     var s1 = filtered.Subscribe(e => Console.WriteLine("filtered : {0} (coeff {1},{2},{3})", e, coeff[0], coeff[1], coeff[2])); 

     // recalculate the filter coefficients say every 10 seconds 
     var bufferedAt10 = buffered.DistinctUntilChanged(e => (e[bufLen - 1].Time.TimeOfDay.Seconds/10) * 10); 

     var s2 = bufferedAt10.Subscribe(e => 
      Console.WriteLine("recalc of coeff : {0}", e[bufLen - 1].Time); 
      for (var i = 0; i < bufLen; i++) 
       // a prototypical function that takes the buffer and uses it to "recalibrate" the filter coefficients 
       coeff[i] = coeff[i] + e[bufLen - 1 - i].Price; 
      Console.WriteLine("updated coeffs to {0},{1},{2}", coeff[0], coeff[1], coeff[2]); 





const int bufLen = 3; 
var rng = new Random(); 
var period = TimeSpan.FromSeconds(0.5); 
var observable = Observable.Interval(period) 
    .Select(=> rng.NextDouble()) 

Console.WriteLine("Press any key to subscribe"); 

var buffered = observable.Buffer(bufLen, 1); 

var seed = new [] {1.0, 1.0, 1.0}; 

var coefficients = buffered 
        //Samples for a new value every 10 seconds 
        //Updates the seed value and emits it after every update 
         //Use good old fashion Linq 
         (coeff, delta) => coeff.Zip(delta.Reverse(), 
             (c, d) => c + d.Price) 

//Emits a new value everytime buffer emits, and combines it with the latest 
//values from the coefficients Observable 
//Kick off coefficients with the seed otherwise you need to wait 10 seconds 
//for the first value. 
buffer.WithLatestFrom(coefficients.StartWith(seed), (e, coeff) => { 
    return e.Zip(coeff, (x, c) => x.Price * c).Sum(); 
.Subscribe(e => Console.WriteLine("filtered : {0}", e); 

非常感謝,我明白了。我想'WithLatestFrom'應該是'CombineLatest'。 – Daniel


@Daniel'CombineLatest'將觸發源* Observable * *。 'WithLatestFrom'只有在第一個源發出時纔會觸發。它只能在預發佈版本2.3.0-beta中提供 – paulpdaniels