2009-04-28 81 views
5

我有什麼,我認爲是一個很常見的線程的情景:線程隊列傻瓜

  • 我有100頁相同的作業完成
  • 所有作業是相互獨立的各個 其他
  • 我要處理同時
  • 由於每個作業 完成最多的 15的工作,新的工作將開始 ,直到所有的工作已經完成

如果您假設每個作業在他完成時(我使用BackgroundWorker類)都會觸發一個事件,我可以想到幾種方法來解決這個問題,但我不確定「正確「的解決方案是。我希望你們中的一些專家能指出我的方向。

解決方案1:有一個while(continue){Threading.Sleep(1000);在我的Main()函數中循環}。當Job_Completed事件處理程序中的代碼將設置continue = false時A)沒有作業剩餘排隊和B)所有排隊的作業已完成。我之前和之後都使用過這個解決方案,看起來工作正常......對我來說似乎有點「奇怪」。

解決方案2:在我的Main()函數中使用Application.Run()。同樣,Job_Completed事件處理程序中的代碼會在A)沒有作業保留排隊時,調用Application.Exit(),並且B)所有排隊的作業都已完成。解決方案3:使用ThreadPool,排隊所有500-1000個請求,讓他們一次運行10個(SetMaxThreads),並以某種方式等待它們全部完成。

在所有這些解決方案中,基本思想是每次完成另一項工作時都會開始一項新工作,直到沒有剩餘工作。所以,問題不僅在於等待現有的工作完成,而且還在等待直到不再有待處理的工作開始。如果ThreadPool是正確的解決方案,那麼等待ThreadPool完成所有排隊項目的正確方法是什麼?

我想我最重要的困惑在於,我不明白HOW事件是否能夠從我的Main()函數中觸發。顯然他們這樣做,我只是不明白從Windows消息循環的角度來看它的機制。解決這個問題的正確方法是什麼?爲什麼?

+0

看起來大多數建議都圍繞着一個ThreadPool風格的解決方案......我上面提出的解決方案1和解決方案2(基本上等待被正在觸發的事件修改的條件)呢?這樣做是否有內在的錯誤,還是僅僅是因爲.NET提供了ThreadPool,所以它沒有必要? 看起來很奇怪的代碼是這樣的: while(continue) Threading.Sleep(1000); ...等待事件在Main()函數中觸發。在像這樣的代碼中......當我的事件得到處理時......在Sleep()調用中的某個地方? – 2009-04-28 15:10:55

回答

0

這裏是我將如何接近它的僞代碼(這不利用線程池,所以有人可能有一個更好的答案:)

main 
{ 
    create queue of 100 jobs 
    create new array of 15 threads 
    start threads, passing each the job queue 
    do whatever until threads are done 
} 

thread(queue) 
{ 
    while(queue isn't empty) 
    { 
     lock(queue) { if queue still isn't empty dequeue a thing } 
     process the thing 
    } 

    queue is empty so exit thread 
} 

編輯:如果你的問題是如何知道何時線程完成,並且您正在使用正常的C#線程(不是ThreadPooled線程),您可以使用可選超時在每個線程上調用Thread.Join(),並且只有在線程完成後纔會返回。如果你想保持跟蹤有多少線程而不會掛了一個完成,可以循環通過他們在這樣的方式:

for(int i = 0; allThreads.Count > 0; i++) 
{ 
    var thisThread = allThreads[i % threads.Count]; 
    if(thisThread.Join(timeout)) // something low, maybe 100 ms or something 
     allThreads.Remove(thisThread); 
} 
2

回覆:「莫名其妙地等待他們全部完成」

ManualResetEvent是你的朋友,在你開始你的大批量創建這些小狗之一之前,在你的主線程中等待它,在作業完成後將它設置在後臺操作的末尾。

另一種方法是手動創建線程,做一個foreach線程的Thread.join()

你可以使用這個(我在測試過程中使用)

 private void Repeat(int times, int asyncThreads, Action action, Action done) { 
     if (asyncThreads > 0) { 

      var threads = new List<Thread>(); 

      for (int i = 0; i < asyncThreads; i++) { 

       int iterations = times/asyncThreads; 
       if (i == 0) { 
        iterations += times % asyncThreads;      
       } 

       Thread thread = new Thread(new ThreadStart(() => Repeat(iterations, 0, action, null))); 
       thread.Start(); 
       threads.Add(thread); 
      } 

      foreach (var thread in threads) { 
       thread.Join(); 
      } 

     } else { 
      for (int i = 0; i < times; i++) { 
       action(); 
      } 
     } 
     if (done != null) { 
      done(); 
     } 
    } 

用法:

// Do something 100 times in 15 background threads, wait for them all to finish. 
Repeat(100, 15, DoSomething, null) 
1

我只是使用任務並行庫。

您可以將它作爲一個單一的,簡單的Parallel.For循環與您的任務,它會自動管理這相當乾淨。如果您不能等待C#4和Microsoft的實現,臨時解決方法是編譯並使用Mono Implementation of TPL。 (我個人比較喜歡MS的實現,特別是較新的beta版本,但是Mono是當今的功能和可再發行版本。)

0

當您在線程隊列中排隊工作項時,應該返回一個等待句柄。把它們放在一個數組中,你可以把它作爲參數傳遞給WaitAll()函數。

+0

好主意,但你會如何做到這一點? QueueUserWorkItem返回一個布爾值。 – Grokys 2009-04-28 01:14:13

1

我會使用ThreadPool。

在開始運行作業之前,請創建一個ManualResetEvent和一個int計數器。將每個作業添加到ThreadPool,每次遞增計數器。

在每項工作結束時,遞減計數器,當計數器達到零時,在事件上調用Set()

在您的主線程中,請致電WaitOne()等待所有作業完成。

3

即使其他答案很好,如果你想要另一個選項(你永遠不能有足夠的選擇),那麼這是一個想法。

只需將每個作業的數據放入FIFO堆棧中的結構即可。

創建15個主題。

每個線程都會從堆棧中獲取下一個作業,並將其彈出。

當一個線程完成處理時,獲取下一個工作,如果堆棧爲空,則線程死亡或者只是睡眠,等待。

唯一的解決辦法很簡單,就是讓彈出窗口處於關鍵部分(同步讀取/彈出窗口)。

0

ThreadPool可能是要走的路。 SetMaxThreads方法將能夠限制正在執行的線程數。但是,這會限制進程/ AppDomain的最大線程數。如果進程作爲服務運行,我不會建議使用SetMaxThreads

private static ManualResetEvent manual = new ManualResetEvent(false); 
private static int count = 0; 

public void RunJobs(List<JobState> states) 
{ 
    ThreadPool.SetMaxThreads(15, 15); 

    foreach(var state in states) 
    { 
      Interlocked.Increment(count); 
      ThreadPool.QueueUserWorkItem(Job, state); 
    } 

    manual.WaitOne(); 
} 

private static void Job(object state) 
{ 
    // run job 
    Interlocked.Decrement(count); 
    if(Interlocked.Read(count) == 0) manual.Set(); 
} 
0

微軟的反應框架是一流此:

Action[] jobs = new Action[100]; 

var subscription = 
    jobs 
     .ToObservable() 
     .Select(job => Observable.Start(job)) 
     .Merge(15) 
     .Subscribe(
      x => Console.WriteLine("Job Done."), 
      () => Console.WriteLine("All Jobs Done.")) 

完成。

Just NuGet「System.Reactive」。