2014-06-27 245 views
3

我試圖實現一個相當簡單的生產者/消費者風格的應用程序與多個生產者和一個消費者。生產者/消費者模式與批生產者

研究使我到BlockingCollection<T>這是有用的,讓我實現如下一個長期運行的消費者任務:

var c1 = Task.Factory.StartNew(() => 
{ 
    var buffer = new List<int>(BATCH_BUFFER_SIZE); 

    foreach (var value in blockingCollection.GetConsumingEnumerable()) 
    { 
     buffer.Add(value); 
     if (buffer.Count == BATCH_BUFFER_SIZE) 
     { 
      ProcessItems(buffer); 
      buffer.Clear(); 
     } 
    } 
}); 

ProcessItems函數提交緩衝到數據庫和它在批量工作。然而,這個解決方案是次優的。在男爵製作期間,可能需要一段時間才能填充緩衝區,這意味着數據庫已過期。

更理想的解決方案是在30秒計時器上運行該任務或用超時短接foreach

我跑了定時器的想法,並與此想出了:

syncTimer = new Timer(new TimerCallback(TimerElapsed), blockingCollection, 5000, 5000); 

private static void TimerElapsed(object state) 
{ 
    var buffer = new List<int>(); 
    var collection = ((BlockingCollection<int>)state).GetConsumingEnumerable(); 

    foreach (var value in collection) 
    { 
     buffer.Add(value); 
    } 

    ProcessItems(buffer); 
    buffer.Clear(); 
} 

這具有foreach將被阻止,直到最後,擊敗定時器的目的明確的問題。

任何人都可以提供一個方向嗎?我基本上需要定期對BlockingCollection進行快照,並處理內容。也許BlockingCollection是錯誤的類型?

回答

5

而不是在定時器回調中使用GetConsumingEnumerable,而是使用其中的一種方法,將結果添加到列表中,直到它返回false或者您已達到令人滿意的批處理大小。

BlockingCollection.TryTake Method (T) - 可能你需要什麼,你根本不想進行進一步的等待。

BlockingCollection.TryTake Method (T, Int32)

BlockingCollection.TryTake Method (T, TimeSpan)

您可以輕鬆地提取出到擴展(未經測試):

public static IList<T> Flush<T> 
(this BlockingCollection<T> collection, int maxSize = int.MaxValue) 
{ 
    // Argument checking. 

    T next; 
    var result = new List<T>(); 

    while(result.Count < maxSize && collection.TryTake(out next)) 
    { 
     result.Add(next); 
    } 

    return result; 
} 
+0

我這個實現這完美的作品去了。它每隔x秒運行一次,只推送可用的(不等待)。謝謝你的幫助。 https://gist.github.com/AntSwift/d2faa4f43d1a6d594172 –

+0

一個完整的工作版本現在可在https://github.com/AntSwift/AS.BlockingCollectionDemo –

+0

我相信你的代碼示例會受到競爭條件的影響,因爲定時器機制。如果您的消費者無法在計時器期限內完成任務,則計時器將觸發而不會導致產生另一個消費者。 –