2012-03-06 145 views
2

目前我在需要一種簡單高效的線程池實現的。我在這裏和谷歌搜索了很多有趣的鏈接,但到目前爲止我找到的東西似乎都很合適。我在網絡上發現的大多數實現都太複雜或者缺乏一些我需要的關鍵特性。實現一個簡單的線程池

此外,我不希望使用的代碼,我不明白,所以我決定編寫它自己(有時重新發明輪子幫助我向前推動自己在知識和經驗方面)。我當然明白線程池背後的基本思想,但是一些實現細節對我來說還是有點不清楚。這可能是因爲我需要的線程池有點特別。讓我描述一下。我有一個任務在特定的(大)緩衝區上完成了數十萬次。我測量過,如果我爲這個任務使用線程,性能會好很多 - 緩衝區被分割成子緩衝區,每個線程都在子緩衝區上執行任務並返回結果。然後所有線程的所有結果加在一起,給我最終的解決方案。

然而,因爲這樣做很多時候我失去了,因爲這麼多的線程創建的寶貴的時間(因爲自帶的線程創建的開銷)。所以我想有一個線程池來執行這個任務,而不是每次創建一組新的線程。

更清楚,這是我到目前爲止有:

  • 斯普利特緩衝成相同大小
  • 對於每個子緩衝器的N個子緩衝區,創建一個線程並運行它在副緩衝
  • 等待所有線程完成(WaitForMultipleObjects的),加在一起的結果和destory螺紋
  • 重複

我想要實現的是這樣的:

  • 拆分緩衝成相同大小
  • 分配每個子緩衝器從線程池的螺紋(其具有正好N個線程)
  • 的N個子緩衝器
  • 一旦線程結束,讓它睡覺,直到當所有的線程完成(睡覺),另一個任務是準備
  • 加在一起,他們通過喚醒線程產生
  • 重複的結果,併爲它們分配新的任務

正如你可以看到,這是一個有點特殊的線程池,因爲我需要等待線程完成。基本上我想擺脫創建線程的所有時間的開銷,因爲程序經過成千上萬次迭代的,因此它可以創建&銷燬線程的milions在其生命週期。好消息是,我不需要在線程之間進行任何同步,他們都獲得了自己的數據和存儲空間。但是,我必須等到所有線程完成後纔能有最終解決方案,因爲下一個任務取決於前一個任務的結果。

我的主要問題是與線程的管理:

  • 如何讓我的線程「休眠」,並喚醒他們一次新的任務是準備好了?
  • 我該如何等待所有線程完成?

我會感謝任何幫助。如果我不夠清楚,也可以隨時提問。謝謝!

+1

我沒有看到它有什麼特別之處,阻塞線程完成是完全正常的。由OS實現的線程池將工作得很好。使用QueueUserWorkItem(),CreateEvent和SetEvent進行同步。 – 2012-03-06 11:22:31

+0

@HansPassant:嗨,謝謝你的建議。你可能會把它作爲一個答案與更多的細節?我不熟悉你提到的功能。 – PeterK 2012-03-06 11:37:38

+0

您發現WaitForMultipleObjects()的地方相同,請使用MSDN Library。 – 2012-03-06 11:46:16

回答

2

對於我的首選方法用於與線程進行通信是經由條件變量。因爲您可以在變化時定義所需的條件和信號。在你的情況下,你可以將與一個隊列結合使用,子緩衝區通過該隊列,所以每個線程在隊列爲空時等待。所述結果然後可以在另一個隊列其中管理隊列等待,直到所有線程已經發布的結果到隊列放(這個隊列中的參考作爲請求連同子緩衝器傳遞)。

0

如何讓我的線程「休眠」,並喚醒他們一旦有新的任務是 準備好了嗎?

您使用互斥或​​信號,根據您的情況(在某些情況下,你可能需要使用一個條件變量或自動/手動resetevent),使線程彼此等待或喚醒時的東西發生。

我如何等待所有的線程完成?

你必須使用join()每個線程在等待它完成。所以如果你有一個線程集合,你可能想要在每個仍在運行的線程上調用join。

作爲一個單獨的音符:線程池就已經存在,你可以只使用之類的東西,而不是Boost.Threadpool重新發明輪子的。

+0

請問爲什麼你更喜歡互斥和信號來調節變量?我想我已經看到了某個地方,但我不確定這是否是一種好的做法。 – 2012-03-06 10:03:13

+0

@ J.N。我從來沒有見過或讀過任何互斥體或信號量不好的想法。 – 2012-03-06 10:09:51

+1

當作爲互斥體或信號量使用時,我也不是。另一方面,互斥體和信號量並不意味着阻止線程等待其他人完成任務,而是意味着共享資源。 – 2012-03-06 10:13:01

1

你看過其他線程池的實現嗎?例如http://threadpool.sourceforge.net/。你想要完成的不是全新的。一種使線程等待新任務的方法是在另一個任務準備好時阻塞互斥體並解除該互斥體。您也可以讓線程通知使用某種來自線程的通知返回給父級。

在我行我已經使用線程池被工作/線程嚴重,並已使用ØMQ跨線程的通信,這使得線程從ØMQ一個read()請求時,它已經準備好新的工作阻止。

隨着一點點研究,並隨着時間和精力一點點,你應該能夠找出如何可以建立或利用現有的框架/工具來建立你所需要的。然後當你有一些你遇到問題的代碼時,你可以回到SO。

0

「正如你所看到的,這是一個有點特殊的線程池,因爲我需要等待線程完成。」 - 不完全的。您希望處理作業中最後一項任務的線程提供作業完成通知。完成通知是一個theadPool的正常功能,否則,始發線程將無法處理一組完整的結果。池通常同時處理多個任務/任務層次結構,因此完成通知方法應該是線程無關的 - 不需要join()或任何類型的東西。此外,沒有WaitForMultipleObject() - 使用難以管理且僅限於64個對象的同步對象數組。

線程池通常有一個線程池在生產者 - 消費者隊列中等待任務。這些任務通常從一些提供線程池服務的'Ctask'類繼承。完成深化和通知的機制就是其中之一。

生產者 - 消費者隊列本質上是由一個互斥體,並在隊列計數任務的信號量和線程等待的保護,多址「正常」隊列類。池線程每個通過該隊列,他們永遠循環下去,等待隊列信號,從鎖定隊列彈出的任務,然後和callling的run()中的任務的方法獲得。

每個任務都會有加載數據成員線程池,因爲它被提交到池中。這允許任務在需要時提交更多任務。每個任務的

完成通常是通過調用事件方法是任務中的一員,由始發線程加載之前的任務提交給池某通知。

任務也應該有一個子任務的原子倒計時整數和事件等對其他任務的完成。

怎麼可能這項工作在你的榜樣?你可以有一個'主要'任務提交數組處理任務並等待他們全部完成。

teh池中應該有更多的線程比核心。我建議兩倍。

該數組需要拆分,以便爲每個部分使用單獨的任務。有多少任務 - 足夠使可用內核全部用完,但不會產生太多以至於生成過多的上下文切換。對於任何合理大小的陣列,可以說64個任務是合理的分割 - 超過可用處理器的典型數量。此外,這些任務不應該按順序拆分以避免虛假分享。

因此,這種「主要」陣列處理任務。使用數組引用加載它,並將其完成事件設置爲指向某種發信號事件的方法。將任務提交給池,等待事件。

任務被加載到一個線程。它的run()方法使用兩個循環並創建32個數組處理任務,每個任務都有自己的起始索引和長度到數組中,但具有非連續的起始索引。該任務使用自己的繼承的submit()方法將32個新任務中的每一個加載到池中。以及對線程實際上排隊的任務執行,這個提交(也)原子遞增完成計數整數,並將任務私人完成事件完成事件排隊的任務之前。私有完成事件原子 - 遞減完成計數並且如果爲零則表示事件。在提交所有32個數組處理事件後,主要任務將等待私人完成事件。

因此,32個數組處理任務在線程上運行。隨着每個完成,正在運行它的線程將調用其完成事件,該事件會減少主任務中的完成計數整數。最後,最後一個數組處理任務完成並且完成計數整數遞減爲零,以便通知運行主任務的線程正在等待的事件。主任務調用它自己的完成事件,因此發信號通知主任務發起者正在等待的事件。

主要任務發起者在完全處理陣列時運行。

..或者,你可以使用一個已經工作,別人的建議線程池類。