2012-10-26 25 views
4

我正在使用第三方庫來迭代一些可能需要幾分鐘的非常大的平面文件。該庫提供了一個枚舉器,因此您可以生成每個結果並進行處理,而枚舉器則提取平面文件中的下一個項目。多個線程使用良率訪問IEnumerable

如:

IEnumerable<object> GetItems() 
{ 
    var cursor = new Cursor; 

    try 
    { 
     cursor.Open(); 

     while (!cursor.EOF) 
     { 
      yield return new //object; 

      cursor.MoveNext(); 
     } 

    } 
    finally 
    { 
     if (cursor.IsOpen) 
     { 
      cursor.Close(); 
     } 
    } 
} 

我想實現是有兩位消費者同樣可枚舉的,所以我不必提取信息的兩倍,因此每個消費者仍然可以處理每一個項目,因爲它無需等到所有時間到達一次。

IEnumerable<object> items = GetItems(); 

new Thread(SaveToDateBase(items)).Start(); 
new Thread(SaveSomewhereElse(items)).Start(); 

我猜我想實現的是類似

「如果消費者要求的是已經萃取,然後產生它的項目,否則未來移動和等待」,但我意識到可能MoveNext()在兩個線程之間發生衝突。

像這樣的事情已經退出,如果沒有任何想法會如何實現?

感謝

回答

2

基本上你想要的是緩存一個IEnumerable<T>的數據,但不等待它收藏起來之前完成。你可以做這樣的事情:

public static IEnumerable<T> Cache<T>(this IEnumerable<T> source) 
{ 
    return new CacheEnumerator<T>(source); 
} 

private class CacheEnumerator<T> : IEnumerable<T> 
{ 
    private CacheEntry<T> cacheEntry; 
    public CacheEnumerator(IEnumerable<T> sequence) 
    { 
     cacheEntry = new CacheEntry<T>(); 
     cacheEntry.Sequence = sequence.GetEnumerator(); 
     cacheEntry.CachedValues = new List<T>(); 
    } 

    public IEnumerator<T> GetEnumerator() 
    { 
     if (cacheEntry.FullyPopulated) 
     { 
      return cacheEntry.CachedValues.GetEnumerator(); 
     } 
     else 
     { 
      return iterateSequence<T>(cacheEntry).GetEnumerator(); 
     } 
    } 

    IEnumerator IEnumerable.GetEnumerator() 
    { 
     return this.GetEnumerator(); 
    } 
} 

private static IEnumerable<T> iterateSequence<T>(CacheEntry<T> entry) 
{ 
    for (int i = 0; entry.ensureItemAt(i); i++) 
    { 
     yield return entry.CachedValues[i]; 
    } 
} 

private class CacheEntry<T> 
{ 
    public bool FullyPopulated { get; private set; } 
    public IEnumerator<T> Sequence { get; set; } 

    //storing it as object, but the underlying objects will be lists of various generic types. 
    public List<T> CachedValues { get; set; } 

    private static object key = new object(); 
    /// <summary> 
    /// Ensure that the cache has an item a the provided index. If not, take an item from the 
    /// input sequence and move to the cache. 
    /// 
    /// The method is thread safe. 
    /// </summary> 
    /// <returns>True if the cache already had enough items or 
    /// an item was moved to the cache, 
    /// false if there were no more items in the sequence.</returns> 
    public bool ensureItemAt(int index) 
    { 
     //if the cache already has the items we don't need to lock to know we 
     //can get it 
     if (index < CachedValues.Count) 
      return true; 
     //if we're done there's no race conditions hwere either 
     if (FullyPopulated) 
      return false; 

     lock (key) 
     { 
      //re-check the early-exit conditions in case they changed while we were 
      //waiting on the lock. 

      //we already have the cached item 
      if (index < CachedValues.Count) 
       return true; 
      //we don't have the cached item and there are no uncached items 
      if (FullyPopulated) 
       return false; 

      //we actually need to get the next item from the sequence. 
      if (Sequence.MoveNext()) 
      { 
       CachedValues.Add(Sequence.Current); 
       return true; 
      } 
      else 
      { 
       Sequence.Dispose(); 
       FullyPopulated = true; 
       return false; 
      } 
     } 
    } 
} 

用法示例:

private static IEnumerable<int> interestingIntGenertionMethod(int maxValue) 
{ 
    for (int i = 0; i < maxValue; i++) 
    { 
     Thread.Sleep(1000); 
     Console.WriteLine("actually generating value: {0}", i); 
     yield return i; 
    } 
} 

public static void Main(string[] args) 
{ 
    IEnumerable<int> sequence = interestingIntGenertionMethod(10) 
     .Cache(); 

    int numThreads = 3; 
    for (int i = 0; i < numThreads; i++) 
    { 
     int taskID = i; 
     Task.Factory.StartNew(() => 
     { 
      foreach (int value in sequence) 
      { 
       Console.WriteLine("Task: {0} Value:{1}", 
        taskID, value); 
      } 
     }); 
    } 

    Console.WriteLine("Press any key to exit..."); 
    Console.ReadKey(true); 
} 
5

Pipelines pattern implementation使用.NET 4 BlockingCollection<T>和TPL的任務是你在找什麼。以完整示例in this StackOverflow post查看我的答案。

舉例:3名simultenious消費者

BlockingCollection<string> queue = new BlockingCollection<string>();  
public void Start() 
{ 
    var producerWorker = Task.Factory.StartNew(() => ProducerImpl()); 
    var consumer1 = Task.Factory.StartNew(() => ConsumerImpl()); 
    var consumer2 = Task.Factory.StartNew(() => ConsumerImpl()); 
    var consumer3 = Task.Factory.StartNew(() => ConsumerImpl()); 

    Task.WaitAll(producerWorker, consumer1, consumer2, consumer3); 
} 

private void ProducerImpl() 
{ 
    // 1. Read a raw data from a file 
    // 2. Preprocess it 
    // 3. Add item to a queue 
    queue.Add(item); 
} 

// ConsumerImpl must be thrad safe 
// to allow launching multiple consumers simulteniously 
private void ConsumerImpl() 
{ 
    foreach (var item in queue.GetConsumingEnumerable()) 
    { 
     // TODO 
    } 
} 

如果事情還不清楚,請讓我知道。管道

高層圖流:

enter image description here

+0

哈哈...好ANS .... – Anirudha

+0

如果事情是不明確的,請詢問 – sll

+0

上downvote任何闡述? – sll