3

我有一個BlockingCollection(ConcurrentBag,50000),我正在嘗試爲生產者線程使用50,000的非常小的有界容量,以最大化我可以在我的消費者中處理的記錄數線程的ConcurrentDictionary。生產者比消費者要快得多,否則會消耗大部分內存。Can Bound CrockingCollections在添加期間丟失數據

不幸的是,我立即注意到,當我的測試數據執行時,我的ConcurrentDictionary中的記錄總數現在大大低於應該添加50,000的有限容量之後。我讀過BlockingCollection的.add方法應該無限期地阻塞,直到集合中有足夠的空間來執行添加。但是,這似乎並非如此。

問題:

  1. 將一個BlockingCollection的。新增方法最終超時或如果有太多的添加的之前在BlockingCollection能力釋放了被稱爲靜默失敗?

  2. 如果#1的答案爲是,那麼在超過邊界容量而不丟失數據後,我可以嘗試多少次增加?

  3. 如果調用了許多BlockingCollection .add()方法,它們正在等待/阻塞容量並調用CompleteAdding()方法,那麼這些等待/阻塞添加是否會繼續等待,然後最終添加或者它們是否會以靜默方式失敗?

+0

你有你的收藏50,000對象?哇!無論如何,如果Blocking Collection在任何操作下「無聲無息」,沒有阻擋或投擲失敗,我會感到非常驚訝。如果確實如此,那是非常糟糕的。 –

+0

@Martin,當我在添加有界容量之前開始追蹤它時,我已經達到了700萬的最大值:)這是一個非常大的過程,消費者比生產者快得多。 –

+0

聽起來像你需要一些流量控制。 –

回答

7

確保,如果您一起使用BlockingCollection與ConcurrentDictionary,你不必藏在你的代碼某處BlockingCollection.TryAdd(myObject的)方法,並誤認爲的ConcurrentDictionary.TryAdd()方法。如果BlockingCollection的Bounding Capacity已被超過,BlockingCollection.TryAdd(myobject)將返回false並放棄產生「無提示失敗」的添加請求。

  1. BlockingCollection的.Add()方法在超過邊界容量之後不會顯示爲「沉默失敗」或失去大量添加。如果太多的.add()正在等待添加到超過容量的BlockingCollection,則add()方法最終將導致進程耗盡內存。 (這將是流量控制問題的極端情況)
  2. 請參閱#1。
  3. 我自己的測試似乎表明,一旦CompleteAdding()方法被調用,所有後續添加失敗,如MSDN文檔中所述。

最後要注意的關於性能

看來,(我自己的情況反正)使用上BlockingCollection一個邊界容量和。新增()是非常緩慢相比,使用無邊界容量和。 TryAdd()在同一個進程中。

通過實施我自己的Bounding Capacity策略,我獲得了更好的性能結果。有很多方法可以做到這一點。三種選擇包括與Monitor.PulseAll()一起使用的Thread.Sleep(),Thread.Spinwait()或Monitor.Wait()。當使用這些策略中的一種時,也可以使用BlockingCollection.TryAdd()而不是BlockingCollection.Add(),並且在沒有丟失任何數據或內存不足的情況下具有無限制容量。這種方法似乎也能產生更好的性能。

您可以從三個示例中選擇哪個方案最適合生產者和消費者線程中的速度差異。

Thread.Wait()實施例:

//Check to see if the BlockingCollection's bounded capacity has been exceeded. 
while (Tokens.Count > 50000) 
{ //If the bounded capacity has been exceeded 
    //place the thread in wait mode 
    Thread.Sleep(SleepTime); 
} 

Thread.SpinWait()實施例:

//Check to see if the BlockingCollection's bounded capacity has been exceeded. 
while (Tokens.Count > 50000) 
{ //If the capacity has been exceeded 
    //place the thread in wait mode 
    Thread.SpinWait(SpinCount); 
} 

Monitor.Wait()實施例

這例子需要在Prod中都有鉤子ucer和消費者雙方。

生產者代碼

//Check to see BlockingCollection capacity has been exceeded. 
if (Tokens.Count > 50000) 
{ 
    lock (syncLock) 
    { //Double check before waiting 
     if (Tokens.Count > 50000) 
     { 
      Monitor.Wait(syncLock, 1000); 
     } 
    } 
} 

消費者法典

//Check to see BlockingCollection capacity is back a normal range. 
if (Tokens.Count <= 40000) 
{ 
    lock (syncLock) 
    { //Double check before waiting 
     if (Tokens.Count < 40000) 
     { 
      Monitor.PulseAll(syncLock); 
     } 
    } 
}