2015-06-08 47 views
3

假設我有一些IEnumerator<T>MoveNext()方法中做了相當數量的處理。一個枚舉器包裝提前預先緩衝了底層枚舉數的一些項目

從該枚舉器消耗的代碼不僅消耗的速度與數據可用速度一樣快,而且偶爾會等待(具體細節與我的問題無關),以便同步它需要恢復消耗的時間。但是當它下一次呼叫MoveNext()時,它需要儘可能快的數據。

一種方法是將整個流預先用於某些列表或數組結構中以進行即時枚舉。然而,這會浪費內存,因爲在任何時間點,只有一個項目正在使用,並且在整個數據不適合內存的情況下,這將是禁止的。

所以是有什麼在.NET泛型一個包裝了枚舉/枚舉的方式,它異步預迭代底層的枚舉幾個項目提前並緩存結果所以它總是有一些其緩衝區中可用的項目和調用的MoveNext將永不需要等待?很明顯,消耗的內容,即由調用者後續的MoveNext迭代的內容將從緩衝區中移除。

N.B.我想要做的一部分也叫背壓,並且在Rx世界中,已經在RxJava中實施並正在討論中Rx.NET。 Rx(可觀察到推送數據)可以被認爲是統計員(統計員允許提取數據)的相反方法。我的回答顯示:背壓在拉動方式上相對容易,只需暫停消耗。推動時更難,需要額外的反饋機制。

+0

這看起來像您之前提出的問題的副本,您稱之爲編輯:http:// stackoverflow。com/questions/30700154/a-pre-buffering-enumerator –

+0

@Asad我刪除了舊問題並創建了這個問題,因爲對現有問題的評論與目前的問題完全不匹配。我希望這個更清楚我真正想要的。 –

+0

這個問題似乎沒有明顯改變。當用戶耗盡緩衝區時,您仍然存在預緩衝一些項目的問題,只會導致延遲時間縮短一半,這是調用MoveNext時的一半。 –

回答

2

您的自定義枚舉類更簡潔的方法是做到這一點:

public static IEnumerable<T> Buffer<T>(this IEnumerable<T> source, int bufferSize) 
{ 
    var queue = new BlockingCollection<T>(bufferSize); 

    Task.Run(() => { 
     foreach(var i in source) queue.Add(i); 
     queue.CompleteAdding(); 
    }); 

    return queue.GetConsumingEnumerable(); 
} 

這可用作:

var slowEnumerable = GetMySlowEnumerable(); 
var buffered = slowEnumerable.Buffer(10); // Populates up to 10 items on a background thread 
+0

好的解決方案。我知道並且已經使用BlockingCollection(並且在高度併發的代碼中停止使用它,因爲它[與ConcurrentQueue + AutoResetEvent比較慢](http://stackoverflow.com/a/29269149/709537))需要很快,但是我不知道'GetConsumingEnumerable',在這個簡單的生產者/消費者場景中,它看起來像是一個完美的解決方案。所以我不必自己推出自己的產品。 –

+0

@EugeneBeresovsky :)。只是我覺得這裏的最佳緩衝區大小是你最終計劃消費的任何東西(即緩衝所有東西,而不是前面的一小段距離)。我的意思是,如果你正在產生另一個線程,讓它繼續工作,儘可能地做好準備。 –

+0

@EugeneBeresovsky不知道BlockingCollection很慢,但你可以使用'ConcurrentQueue',並鎖定大小檢查與'Add'同步。 –

0

有實現這個自己不同的方式,我決定用每枚舉,做異步預緩衝

  • 元素固定數量的預先緩衝
    • 一個專門的線程

    這對我手邊的情況來說是完美的(只有少數很長的普查員),但是例如如果你使用大量的枚舉器,創建一個線程可能太重了,如果你需要更動態的東西,固定數量的元素可能太不靈活了,或許根據項目的實際內容。

    我到目前爲止只測試過它的主要特徵,並且可能會留下一些粗糙的邊緣。它可以像這樣使用:

    int bufferSize = 5; 
    IEnumerable<int> en = ...; 
    foreach (var item in new PreBufferingEnumerable<int>(en, bufferSize)) 
    { 
        ... 
    

    這裏是枚舉的要點:

    class PreBufferingEnumerator<TItem> : IEnumerator<TItem> 
    { 
        private readonly IEnumerator<TItem> _underlying; 
        private readonly int _bufferSize; 
        private readonly Queue<TItem> _buffer; 
        private bool _done; 
        private bool _disposed; 
    
        public PreBufferingEnumerator(IEnumerator<TItem> underlying, int bufferSize) 
        { 
         _underlying = underlying; 
         _bufferSize = bufferSize; 
         _buffer = new Queue<TItem>(); 
         Thread preBufferingThread = new Thread(PreBufferer) { Name = "PreBufferingEnumerator.PreBufferer", IsBackground = true }; 
         preBufferingThread.Start(); 
        } 
    
        private void PreBufferer() 
        { 
         while (true) 
         { 
          lock (_buffer) 
          { 
           while (_buffer.Count == _bufferSize && !_disposed) 
            Monitor.Wait(_buffer); 
           if (_disposed) 
            return; 
          } 
          if (!_underlying.MoveNext()) 
          { 
           lock (_buffer) 
            _done = true; 
           return; 
          } 
          var current = _underlying.Current; // do outside lock, in case underlying enumerator does something inside get_Current() 
          lock (_buffer) 
          { 
           _buffer.Enqueue(current); 
           Monitor.Pulse(_buffer); 
          } 
         } 
        } 
    
        public bool MoveNext() 
        { 
         lock (_buffer) 
         { 
          while (_buffer.Count == 0 && !_done && !_disposed) 
           Monitor.Wait(_buffer); 
          if (_buffer.Count > 0) 
          { 
           Current = _buffer.Dequeue(); 
           Monitor.Pulse(_buffer); // so PreBufferer thread can fetch more 
           return true; 
          } 
          return false; // _done || _disposed 
         } 
        } 
    
        public TItem Current { get; private set; } 
    
        public void Dispose() 
        { 
         lock (_buffer) 
         { 
          if (_disposed) 
           return; 
          _disposed = true; 
          _buffer.Clear(); 
          Current = default(TItem); 
          Monitor.PulseAll(_buffer); 
         } 
        }