2010-12-16 70 views
6

我有一個生產者/消費者隊列,除了有特定類型的對象。因此,不只是任何消費者都可以消費額外的物品。我不想爲每種類型制定特定的隊列,因爲數量太多。 (它有點延伸了生產者/消費者的定義,但我不確定正確的術語是什麼。)C#producer/consumer/observer?

有沒有像EventWaitHandle那樣的允許帶參數的脈衝的東西?例如myHandle.Set(AddedType = "foo")。現在我正在使用Monitor.Wait,然後每個消費者檢查是否脈衝實際上是針對他們的,但這似乎毫無意義。

什麼,我現在有一個pseduocode版本:

class MyWorker { 
    public string MyType {get; set;} 
    public static Dictionary<string, MyInfo> data; 

    public static void DoWork(){ 
     while(true){ 
      if(Monitor.Wait(data, timeout)){ 
        if (data.ContainsKey(MyType)){ 
         // OK, do work 
        } 
      } 
     } 
    } 
} 

正如你所看到的,我可能會在其他的東西被添加到字典獲取脈衝。我只關心何時將MyType添加到字典中。有沒有辦法做到這一點?這不是什麼大問題,但是,例如,我現在必須手動處理超時,因爲每次獲得鎖都會在超時內成功,但MyType永遠不會添加到timeout內的字典中。

+0

只是一個想法,每個對象類型都有它自己的鎖對象,所以你可以'Monitor.Pulse'適當的對象? – 2010-12-16 16:00:00

+0

@deltreme:是的,這是可能的,但我不想爲每種類型創建一個新的對象。 – Xodarap 2010-12-16 16:02:11

+0

我不確定你的意思是太多的類型爲每個隊列單獨排隊。怎麼會這樣?這是實現它的自然方式,我不確定有多少單獨的隊列比其他任何事件處理機制效率低。 – 2010-12-16 16:10:33

回答

3

這是一個有趣的問題。這聽起來像解決方案的關鍵是priority queue的阻止變體。 Java有PriorityBlockingQueue,但不幸的是,.NET BCL的等價物不存在。然而,一旦你有了它,實現起來很簡單。

class MyWorker 
{ 
    public string MyType {get; set;} 
    public static PriorityBlockingQueue<string, MyInfo> data; 

    public static void DoWork() 
    { 
     while(true) 
     { 
      MyInfo value; 
      if (data.TryTake(MyType, timeout, out value)) 
      { 
       // OK, do work 
      } 
     } 
    } 
} 

實施PriorityBlockingQueue並不是非常困難。跟BlockingCollection採用AddTake樣式方法相同的模式,我想出了下面的代碼。

public class PriorityBlockingQueue<TKey, TValue> 
{ 
    private SortedDictionary<TKey, TValue> m_Dictionary = new SortedDictionary<TKey,TValue>(); 

    public void Add(TKey key, TValue value) 
    { 
     lock (m_Dictionary) 
     { 
      m_Dictionary.Add(key, value); 
      Monitor.Pulse(m_Dictionary); 
     } 
    } 

    public TValue Take(TKey key) 
    { 
     TValue value; 
     TryTake(key, TimeSpan.FromTicks(long.MaxValue), out value); 
     return value; 
    } 

    public bool TryTake(TKey key, TimeSpan timeout, out TValue value) 
    { 
     value = default(TValue); 
     DateTime initial = DateTime.UtcNow; 
     lock (m_Dictionary) 
     { 
      while (!m_Dictionary.TryGetValue(key, out value)) 
      { 
       if (m_Dictionary.Count > 0) Monitor.Pulse(m_Dictionary); // Important! 
       TimeSpan span = timeout - (DateTime.UtcNow - initial); 
       if (!Monitor.Wait(m_Dictionary, span)) 
       { 
        return false; 
       } 
      } 
      m_Dictionary.Remove(key); 
      return true; 
     } 
    } 
} 

這是一個快速實施,它有幾個問題。首先,我還沒有測試過它。其次,它使用紅黑樹(通過SortedDictionary)作爲基礎數據結構。這意味着TryTake方法將具有O(log(n))複雜性。優先級隊列通常具有O(1)去除複雜性。典型的優先級隊列數據結構是heap,但我發現skip lists在實踐中實際上更好,原因有幾個。 .NET BCL中都沒有這些,這就是爲什麼我使用了SortedDictionary,儘管在這種情況下性能較差。

我在這裏應該指出,這實際上並沒有解決無意義的Wait/Pulse行爲。它只是封裝在PriorityBlockingQueue類中。但是,至少這肯定會清理你的代碼的核心部分。

它似乎沒有像您的代碼每個鍵處理多個對象,但添加到字典時使用Queue<MyInfo>而不是簡單的舊MyInfo可以很容易地添加。

+0

有趣的,好主意來封裝這個。是否有你使用'SortedDictionary'而不是'Dictionary'的理由?只需更快的插入時間? – Xodarap 2010-12-16 20:28:45

+0

@ Xodarap:恩,好問題。我原本以爲你需要一個'Take'重載,它不接受一個鍵,它會從字典中刪除第一項。你需要一個有序的字典來正確地做到這一點。這實際上是一個優先隊列如何工作。我想你可以使用普通的舊字典,只需調用封裝類「BlockingDictionary」即可。很好的接收。 – 2010-12-16 22:00:47

1

看起來好像你想要將生產者/消費者隊列與觀察者模式結合起來 - 通用消費者線程從隊列中讀取,然後將事件傳遞給所需的代碼。在這種情況下,您不會實際發信號給觀察者,只是在消費者線程識別誰對給定工作項目感興趣時才調用它。

.Net中的觀察者模式通常使用C#事件實現。您只需要調用該對象的事件處理程序,並通過它調用一個或多個觀察者。目標代碼首先必須將自己註冊到觀察對象上,並將其自身添加到事件中以便在工作到達時發出通知。

+0

當'type'更新時,這聽起來像我應該有一個'Dictionary ',並且只做'Monitor.Pulse(Dictionary [type])'?看來我可以刪除觀察者,然後從生產者手動執行此操作。 (我同意這樣做,但我希望我可以使用幕後的消費者集合。) – Xodarap 2010-12-16 17:02:02

+0

如果您使用Monitor來激活目標對象,那麼通過隊列訪問單獨的消費者線程似乎是多餘的, 當然。 – 2010-12-16 19:13:26