2011-07-06 20 views
5

我有多個生產者和單個消費者的基本默認併發隊列。但是,如果隊列中還沒有被消耗的東西,生產者就不應該再次排隊。 (獨特的無重複阻塞集合,使用默認的併發隊列)如何訪問阻塞集合

if (!myBlockingColl.Contains(item)) 
    myBlockingColl.Add(item) 

但是阻塞科爾沒有包含方法也不提供任何形式trypeek()方法一樣的。我怎樣才能訪問底層併發隊列,所以我可以做類似

if (!myBlockingColl.myConcurQ.trypeek(item) 
    myBlockingColl.Add(item) 

在尾旋轉。請幫忙。感謝

+2

如果有這樣的方法,那麼它將不得不是一個單一的原子方法。它永遠不會被執行,因爲你的兩個摘錄是因爲anothe線程可能會在TryPeek和Add之間添加。我認爲你運氣不好。無論如何,你爲什麼需要這樣的能力? –

+0

在我的情況下,如果另一個線程在trypeek和add之間添加,則可以。我主要是這樣做的,作爲安全檢查,調度程序在多個線程(因此有多個生產者)觸發報告生成觸發器。如果無論出於何種原因調度程序出現故障並且在短時間內多次觸發相同的觸發器,我只想處理。明白我可以解決並以某種方式處理它。看起來好像沒有優雅的處理方式。謝謝 – Gullu

回答

1

我建議實施與鎖定您的操作,讓你不讀,並在破壞它的方式寫的項目,使它們的原子。例如,使用任何IEnumerable:

object bcLocker = new object(); 

// ... 

lock (bcLocker) 
{ 
    bool foundTheItem = false; 
    foreach (someClass nextItem in myBlockingColl) 
    { 
     if (nextItem.Equals(item)) 
     { 
      foundTheItem = true; 
      break; 
     } 
    } 
    if (foundTheItem == false) 
    { 
     // Add here 
    } 
} 
+1

瞭解它可以通過鎖定老式的方式來完成。由於我使用的是新的網絡4類,大多數情況下工作良好,沒有明確的鎖定,我希望有更酷的東西。謝謝 – Gullu

+0

沒有「更酷」的方式 - 沒有鎖定就沒有任何工作。即使兩個添加可能會發生衝突。 –

+0

集合上的每一個操作都可以改變它需要保護的集合上的這個lockevery操作,可以改變它需要用這個鎖保護 –

7

這是一個有趣的問題。這是我第一次見到有人要求阻止重複的阻止隊列。奇怪的是,我找不到你想要的東西在BCL中已經存在。我說這是奇怪,因爲BlockingCollection可以接受IProducerConsumerCollection因爲它具有TryAdd方法被公佈爲能夠檢測到重複時失敗底層集合。問題是,我沒有看到IProducerConsumerCollection的具體實現,防止重複。至少我們可以寫我們自己的。現在

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T> 
{ 
    // TODO: You will need to fully implement IProducerConsumerCollection. 

    private Queue<T> queue = new Queue<T>(); 

    public bool TryAdd(T item) 
    { 
    lock (queue) 
    { 
     if (!queue.Contains(item)) 
     { 
     queue.Enqueue(item); 
     return true; 
     } 
     return false; 
    } 
    } 

    public bool TryTake(out T item) 
    { 
    lock (queue) 
    { 
     item = null; 
     if (queue.Count > 0) 
     { 
     item = queue.Dequeue(); 
     } 
     return item != null; 
    } 
    } 
} 

,我們有我們IProducerConsumerCollection不接受重複利用它我們可以這樣:

public class Example 
{ 
    private BlockingCollection<object> queue = new BlockingCollection<object>(new NoDuplicatesConcurrentQueue<object>()); 

    public Example() 
    { 
    new Thread(Consume).Start(); 
    } 

    public void Produce(object item) 
    { 
    bool unique = queue.TryAdd(item); 
    } 

    private void Consume() 
    { 
    while (true) 
    { 
     object item = queue.Take(); 
    } 
    } 
} 

你可以不喜歡我實施NoDuplicatesConcurrentQueue。如果您認爲您需要TPL收藏提供的低鎖性能,您當然可以自由使用ConcurrentQueue或其他任何方式實施自己的產品。

更新:

我能夠今天早上來測試代碼。有一些好消息和壞消息。好消息是,這將在技術上起作用。壞消息是,你可能不希望這樣做,因爲BlockingCollection.TryAdd攔截來自底​​層IProducerConsumerCollection.TryAdd方法的返回值,當檢測false拋出異常。是的,那是對的。它不會像您所期望的那樣返回false,而是會生成一個異常。我必須說實話,這是令人驚訝和可笑的。 TryXXX方法的重點在於它們不應該拋出異常。我深感失望。

+0

Brian,認爲你是男人。讓我消化這個,如果我找到了答案,我會在明天更新。謝謝 – Gullu

+0

整個IProducerConsumerCollection需要實施的事實讓我重新發明了車輪的感覺。一定會有更好的辦法。 Upvoted你的解決方案,因爲我最終可能會做類似的事情。謝謝 – Gullu

+0

@Gullu:我剛剛更新了我的答案。無論如何,我不認爲你會想要使用這個策略。閱讀我的更新。 –

4

除了布萊恩·基甸後提到的更新的警告,該解決方案從這些性能問題受到影響:

  • O(n)的隊列(queue.Contains(item))會對性能產生嚴重的影響操作作爲隊列增長
  • 鎖限制併發性(這是他未提到)

下面的代碼改善業務收益指數使用散列組由

  • 的溶液做O(1)查找
  • System.Collections.Concurrent命名空間組合2層數據結構

N.B.由於沒有ConcurrentHashSet,我正在使用ConcurrentDictionary,忽略了這些值。

在這種罕見的情況下,幸運地可以在不添加鎖的情況下簡單地編寫出多個更簡單的併發數據結構。這兩個併發數據結構的操作順序非常重要。

public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T> 
{ 
    private readonly ConcurrentDictionary<T, bool> existingElements = new ConcurrentDictionary<T, bool>(); 
    private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 

    public bool TryAdd(T item) 
    { 
     if (existingElements.TryAdd(item, false)) 
     { 
      queue.Enqueue(item); 
      return true; 
     } 
     return false; 
    } 

    public bool TryTake(out T item) 
    { 
     if (queue.TryDequeue(out item)) 
     { 
      bool _; 
      existingElements.TryRemove(item, out _); 
      return true; 
     } 
     return false; 
    } 
    ... 
} 

N.B.查看此問題的另一種方法:您希望保留插入順序