2013-06-21 110 views
1

我有下面的代碼,我從源中填充用戶,爲了舉例,如下所示。我想要做的是消費BlockingCollection與多個消費者。消費阻止具有多個任務/消費者的集合

低於正確的方式做到這一點?另外什麼是最好的線程數?好吧,這將取決於硬件,內存等。或者我怎樣才能以更好的方式做到這一點?

也會在下面的實現確保我會處理集合中的所有東西,直到它是空的?

class Program 
    { 
     public static readonly BlockingCollection<User> users = new BlockingCollection<User>(); 

     static void Main(string[] args) 
     { 
      for (int i = 0; i < 100000; i++) 
      { 
       var u = new User {Id = i, Name = "user " + i}; 
       users.Add(u); 
      } 

      Run(); 
     } 

     static void Run() 
     { 
      for (int i = 0; i < 100; i++) 
      { 
       Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning); 
      } 
     } 

     static void Process() 
     { 
      foreach (var user in users.GetConsumingEnumerable()) 
      { 
       Console.WriteLine(user.Id); 
      } 
     } 
    } 

    public class User 
    { 
     public int Id { get; set; } 
     public string Name { get; set; } 
    } 

回答

5

一些小東西

  1. 你從沒叫過CompleteAdding,不這樣做,你的消費foreach循環將永遠不會完成,並掛到永遠。通過在初始for循環後執行users.CompleteAdding()修復該問題。
  2. 你永遠都不會等待工作完成,Run()會啓動你的100個線程(除非你的真實過程需要大量的等待無爭議資源,否則這可能太多了)。因爲任務不是前臺線程,所以當您的Main退出時,它們不會讓您的程序保持打開狀態。您需要一個CountdownEvent來跟蹤何時完成所有事情。
  3. 在生產者完成所有工作之後,您纔會啓動消費者,您應該將生產者分拆到單獨的線程中,或者首先啓動消費者,以便他們隨時準備工作,同時將生產者填充到生產者上主線程。

這裏是代碼的更新版本與修復

class Program 
{ 
    private const int MaxThreads = 100; //way to high for this example. 
    private static readonly CountdownEvent cde = new CountdownEvent(MaxThreads); 
    public static readonly BlockingCollection<User> users = new BlockingCollection<User>(); 

    static void Main(string[] args) 
    { 
     Run(); 

     for (int i = 0; i < 100000; i++) 
     { 
      var u = new User {Id = i, Name = "user " + i}; 
      users.Add(u); 
     } 
     users.CompleteAdding(); 
     cde.Wait(); 
    } 

    static void Run() 
    { 
     for (int i = 0; i < MaxThreads; i++) 
     { 
      Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning); 
     } 
    } 

    static void Process() 
    { 
     foreach (var user in users.GetConsumingEnumerable()) 
     { 
      Console.WriteLine(user.Id); 
     } 
     cde.Signal(); 
    } 
} 

public class User 
{ 
    public int Id { get; set; } 
    public string Name { get; set; } 
} 

「線程的最佳數量」就像我前面說的,這真的取決於你在等待什麼。

如果你正在處理的是CPU限制,最佳線程數可能是Enviorment.ProcessorCount

如果你正在做的是等待一個外部資源,但新的要求,不影響老的請求(例如詢問信息的20臺不同的服務器,服務器對服務器n負載不會影響服務器n+1負載)的那種情況下,我會讓Parallel.ForEach只選擇你的線程數。

如果您正在等待被爭用的資源(例如讀取/寫入硬盤),則根本不想使用非常多的線程(可能甚至只使用一個)。我剛剛發佈了a answer in another question,從硬盤讀入數據時,應該只使用一個線程,這樣硬盤就不會四處跳躍,試圖一次完成所有讀取操作。

+0

如何調用cde.Signal();來自Process(),你的意思是讓這個靜態只讀實例? – DarthVader

+0

是的,我做了,我在這裏寫了代碼,而不是在調試器中。我會糾正的。 –

+0

我正在使用IO,即:從交換/ AD獲取用戶信息,但我有這麼多的用戶,我需要一些並行性,這種模式會有效嗎?或者你會推薦另一種方法? – DarthVader