2012-07-15 16 views
5
  • 我有一個I/O密集型操作。
  • 我只想要一次運行5個線程的MAX。
  • 我有8000個任務要排隊和完成。
  • 每個任務大約需要15-20秒才能執行。

我在線程池四周看了看,但是ThreadPool挫敗 - 線程創建超過SetMaxThreads

 ThreadPool.SetMaxThreads(5, 0); 

     List<task> tasks = GetTasks(); 

     int toProcess = tasks.Count; 
     ManualResetEvent resetEvent = new ManualResetEvent(false); 

     for (int i = 0; i < tasks.Count; i++) 
     { 
      ReportGenerator worker = new ReportGenerator(tasks[i].Code, id); 
      ThreadPool.QueueUserWorkItem(x => 
      { 
       worker.Go(); 
       if (Interlocked.Decrement(ref toProcess) == 0) 
        resetEvent.Set(); 
      }); 
     } 

     resetEvent.WaitOne(); 

我想不通爲什麼......我的代碼是在同一時間執行超過5個線程。我試過setmaxthreads,setminthreads,但它一直執行超過5個線程。

發生了什麼事?我錯過了什麼?我應該以另一種方式來做這件事嗎?

謝謝

+0

您已經在調試器中驗證了** tasks.Count **的值嗎?你有沒有嘗試過把「5」放進去? – 2012-07-15 02:47:03

+0

任務數組中有〜8000個對象 – Mike 2012-07-15 02:48:00

回答

3

任務並行庫可以幫助您:

List<task> tasks = GetTasks(); 

Parallel.ForEach(tasks, new ParallelOptions { MaxDegreeOfParallelism = 5 }, 
    task => {ReportGenerator worker = new ReportGenerator(task.Code, id); 
      worker.Go();}); 

What does MaxDegreeOfParallelism do?

+0

這很簡單!像魅力一樣工作!謝謝! – Mike 2012-07-15 03:37:07

1

我認爲有一個不同的,更好的方法來解決這個問題。 (請原諒,如果我不小心Java化了一些語法)

這裏的主線程有一個在「任務」中要做的事情列表 - 而不是爲每個任務創建線程,當你有這麼多的項目,創建所需的線程數量,然後讓他們根據需要從列表中請求任務。

第一件要做的事情就是在這個代碼來自的類中添加一個變量,用作指向列表的指針。我們還將添加一個用於最大期望線程數。

// New variable in your class definition 
private int taskStackPointer; 
private final static int MAX_THREADS = 5; 

創建一個方法,該方法返回列表中的下一個任務並遞增堆棧指針。然後創建了這個新的接口:

// Make sure that only one thread has access at a time 
[MethodImpl(MethodImplOptions.Synchronized)] 
public task getNextTask() 
{ 
    if(taskStackPointer < tasks.Count) 
     return tasks[taskStackPointer++]; 
    else 
     return null; 
} 

或者,你可以返回任務[taskStackPointer ++]代碼,如果有,你可以指定爲表示「列表結束」的值。然而,這樣做可能更容易。

接口:

public interface TaskDispatcher 
{ 
    [MethodImpl(MethodImplOptions.Synchronized)] public task getNextTask(); 
} 

內ReportGenerator類,改變構造函數接受調度對象:

public ReportGenerator(TaskDispatcher td, int idCode) 
{ 
    ... 
} 

您還需要改變ReportGenerator類以便處理有一個外部循環,它通過調用td.getNextTask()來請求一個新任務,並在返回NULL時退出循環。

最後,改變線程創建代碼是這樣的:(這只是給你一個想法)

taskStackPointer = 0; 
for (int i = 0; i < MAX_THREADS; i++) 
{ 
    ReportGenerator worker = new ReportGenerator(this,id); 
    worker.Go(); 
} 

創建線程的所需數量的方式,讓他們都在最大容量工作。

(我不確定我是否使用了「[MethodImpl(MethodImplOptions.Synchronized)]」正好...我更習慣的Java比C#)

+0

感謝您花時間回答我的問題,它確實有道理。儘管如此,這種方法更加冗長:P – Mike 2012-07-15 03:36:45

+0

它可能稍微冗長一些,但是一旦找到它,它就會非常高效且易於理解。 – 2012-07-15 03:49:21

1

你的任務列表將在它8K項,因爲你說的代碼放在他們那裏:

List<task> tasks = GetTasks(); 

這就是說,這個數字已經沒有任何關係多少線程在這個意義上被使用,調試器總是要告訴你的廣告有多少項目代表名單。

有多種方法可以確定正在使用多少個線程。也許最簡單的一個就是用調試器打入應用程序並查看線程窗口。不僅你會得到一個計數,但你會看到每個線程正在做什麼(或不是),這導致我...

有關於你的任務正在做什麼以及你如何到達的重要討論在一個數字'節流'的線程池。在大多數情況下,線程池將會做正確的事情。

現在來回答您的具體問題......

要明確控制併發任務的數量,考慮一個簡單的實現,將涉及從清單BlockingCollection改變你的任務集合(也將在內部使用ConcurrentQueue)和下面的代碼爲「消費」的工作:

var parallelOptions = new ParallelOptions 
{ 
    MaxDegreeOfParallelism = 5 
}; 

Parallel.ForEach(collection.GetConsumingEnumerable(), options, x => 
{ 
    // Do work here... 
}); 

變化MaxDegreeOfParallelism,無論你已經確定併發值是適合你正在做的工作。

以下可能是你的興趣:

Parallel.ForEach Method

BlockingCollection

克里斯

3

有在那個SetMaxThreads的限制,你不能把它比數較低處理器在系統上。如果您有8個處理器,將其設置爲5與完全不調用該函數相同。