2011-10-28 18 views
1

我想獲得一個可觀察的20個最新值,揭露它作爲屬性不會阻塞的發生。此刻,我的代碼如下所示:的IObservable TakeLast(n)和阻塞

class Foo 
{ 
    private IObservable<int> observable; 

    public Foo(IObservable<int> bar) 
    { 
     this.observable = bar; 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      return this.observable.TakeLast(20).ToEnumerable(); 
     } 
    } 
} 

然而,當MostRecentBars調用getter時,這是堵,大概是因爲ToEnumerable不會返回,直到至少有20個觀測值。

有暴露最多20個最新的可觀測值的無阻塞一個內置的方式嗎?如果有少於20個觀測值,那麼它應該返回所有這些值。

+0

有沒有所謂的TakeLast方法的IObservable ojlovecd

回答

0

我想不出一個內置適合您的要求的Rx運營商(S)。你可以這樣實現:

class Foo 
{ 
    private IObservable<int> observable; 
    private Queue<int> buffer = new Queue<int>(); 

    public Foo(IObservable<int> bar) 
    { 
     this.observable = bar; 

     this.observable 
      .Subscribe(item => 
      { 
       lock (buffer) 
       { 
        if (buffer.Count == 20) buffer.Dequeue(); 
        buffer.Enqueue(item); 
       } 
      }); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      lock (buffer) 
      { 
       return buffer.ToList();  // Create a copy. 
      } 
     } 
    } 
} 
2

我給你兩個選擇。其中使用的Rx Scan運營商,但我認爲,一個使得它更復雜一點閱讀。其他使用標準Queue有鎖定。你可以選擇。

(1)

class Foo 
{ 
    private int[] bars = new int[] { }; 

    public Foo(IObservable<int> bar) 
    { 
     bar 
      .Scan<int, int[]>(
       new int[] { }, 
       (ns, n) => 
        ns 
         .Concat(new [] { n, }) 
         .TakeLast(20) 
         .ToArray()) 
      .Subscribe(ns => bars = ns); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      return bars; 
     } 
    } 
} 

(2)

class Foo 
{ 
    private Queue<int> queue = new Queue<int>(); 

    public Foo(IObservable<int> bar) 
    { 
     bar.Subscribe(n => 
     { 
      lock (queue) 
      { 
       queue.Enqueue(n); 
       if (queue.Count > 20) 
       { 
        queue.Dequeue(); 
       } 
      } 
     }); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      lock (queue) 
      { 
       return queue.ToArray(); 
      } 
     } 
    } 
} 

我希望這些幫助。

+0

現在看到的是伊利安平鬆幾乎寫了同樣的'Queue'實現我會後作爲你最好的選擇,採用這種方法。我們的解決方案在這一方面有多接近,真是令人驚歎。在發佈我之前,我沒有看他。儘管如此,'Scan'運算符經常被忽略,但它在許多情況下非常有用。 – Enigmativity

+0

其實我更喜歡使用'Scan'第一種方法,它使我們有可能繼續超出可觀察鏈中的「取最後(最多)N」。實際上,將其轉換爲用於此目的的擴展方法將是非常好的,例如'var recentAverageScore = scores.TakeUpToLast(25).Average();'或者其他您想要對'IObservable '結果進行的任何其他操作。 –

0

雖然你已經得到了你的答案,我想解決與緩衝這種使用重播主題和喜歡的東西來到了:

class Foo 
{ 
    private ReplaySubject<int> replay = new ReplaySubject<int>(20); 

    public Foo(IObservable<int> bar) 
    { 
     bar.Subscribe(replay); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      var result = new List<int>(); 
      replay.Subscribe(result.Add); //Replay fill in the list with buffered items on same thread 
      return result; 
     } 
    } 
} 

讓我知道這是否適合您的問題。

1

我有一些擴展我傾向於重視我建立與反應任何擴展項目,其中之一是一個滑動窗口:

public static IObservable<IEnumerable<T>> SlidingWindow<T>(this IObservable<T> o, int length) 
{ 
    Queue<T> window = new Queue<T>(); 

    return o.Scan<T, IEnumerable<T>>(new T[0], (a, b) => 
    { 
     window.Enqueue(b); 
     if (window.Count > length) 
      window.Dequeue(); 
     return window.ToArray(); 
    }); 
} 

這返回最近的N個項目組成的數組(或更少,如果還沒有N件物品)。

對於你的情況,你應該能夠做到:

class Foo 
{ 
    private IObservable<int> observable; 
    private int[] latestWindow = new int[0]; 

    IDisposable slidingWindowSubscription; 

    public Foo(IObservable<int> bar) 
    { 
     this.observable = bar; 
     slidingWindowSubscription = this.observable.SlidingWindow(20).Subscribe(a => 
      { 
       latestWindow = a; 
      }); 
    } 

    public IEnumerable<int> MostRecentBars 
    { 
     get 
     { 
      return latestWindow; 
     } 
    } 
}