2014-12-13 171 views
3

結合我不知道如果我所試圖做的是一個有效的做法,但這裏有雲: 我需要我的程序進行高度並行化,所以我想我可以做2-3進程,每個進程可以有2-3個線程。Python的多處理多線程

1)這可能嗎? 2)有什麼意思? 3)這是我的代碼,但當我嘗試加入過程時,它會掛起。

PQ = multiprocessing.Queue() 

[...]

def node(self, files, PQ): 

     l1, l2 = self.splitList(files) 
     p1 = multiprocessing.Process(target=self.filePro, args=(l1,PQ,)) 
     p2 = multiprocessing.Process(target=self.filePro, args=(l2,PQ,)) 
     p1.daemon = True 
     p2.daemon = True 
     p1.start() 
     p2.start() 

     p1.join() # HANGS HERE 
     p2.join() 
     while 1: 
      if PQ.empty(): 
       break 
      else: 
       print(PQ.get()) 
     PQ.join() 

    def filePro(self,lst,PQ): 
     TQ = queue.Queue() 
     l1, l2 = self.splitList(lst) 
     t1 = threading.Thread(target=self.fileThr, args=('a',l1,TQ,)) 
     t2 = threading.Thread(target=self.fileThr, args=('b',l2,TQ,)) 
     t1.daemon = True 
     t2.daemon = True 
     t1.start() 
     t2.start() 

     t1.join() 
     t2.join() 
     while 1: 
      if TQ.empty(): 
       break 
      else: 
       PQ.put(TQ.get()) 
       TQ.task_done() 
     TQ.join() 

def fileThr(self,id,lst,TQ): 
     while lst: 
      tmp_path = lst.pop() 
      if (not tmp_path[1]): 
       continue 
      for item in tmp_path[1]: 
       TQ.put(1) 
     TQ.join() 
+0

當我需要最大限度地利用cpu時,我使用了進程,當我阻塞磁盤訪問,網絡等操作時使用線程。因此,如果我有腳本來下載許多文件,我會創建一個線程池並使用它。如果我有一個分佈式計算可以達到峯值cpu,那麼我會使用一個進程池。 – 2014-12-13 03:52:27

+0

如果您希望我們調試您的代碼,我們需要[最小,完整,可驗證的示例](http://stackoverflow.com/help/mcve)。 – abarnert 2014-12-13 04:21:19

回答

6

1)這是可能的嗎?

是的。


2)是否有任何點成嗎?

是的。但通常不是你要找的點。首先,幾乎每個現代操作系統都使用「扁平」調度程序;還有分散在3個方案或在8個項目8線程8個線程之間沒有差異。*

*有些程序可以通過仔細使用進程內,只鎖或其他同步原語在一些地方,你知道得到顯著的好處你只與同一個程序中的線程共享 - 當然,通過避免在這些地方共享內存 - 但你不會通過平均地跨線程和線程跨線程分散你的作業來獲得這種好處。其次,即使您在默認的CPython解釋器中使用舊的SunOS,全局解釋器鎖(GIL)也可以確保一次只有一個線程可以運行Python代碼。如果你花時間運行顯式釋放GIL的C擴展庫中的代碼(就像一些NumPy函數一樣),線程可以提供幫助,但否則它們都會以序列化的方式結束。

主要情況線程和進程是有用在一起就是你有兩個CPU綁定和I/O密集型工作。在這種情況下,通常一方正在餵食另一方。如果I/O提供給CPU,則使用主進程中的單個線程池來處理I/O,然後使用一組工作進程來對結果執行CPU工作。如果相反,使用工作進程池來完成CPU工作,然後讓每個工作進程使用線程池來執行I/O。


3)這是我的代碼,但它掛起當我嘗試加入的進程。

當您不給minimal, complete, verifiable example時,調試代碼非常困難。

但是,我可以看到一個明顯的問題。

您正在嘗試使用TQ作爲生產者 - 消費者隊列,與t1t2作爲生產者和filePro父爲消費者。在t1.join()t2.join()返回之前,您的消費者不會撥打TQ.task_done(),直到這些線程完成後纔會發生。但是這些製作人不會完成,因爲他們正在等待你撥打TQ.task_done()。所以,你有一個僵局。

而且,由於您的每個子進程的主線程都處於死鎖狀態,所以它們永遠都不會完成,因此p1.join()將永遠阻塞。

如果你真的想讓主線程在做任何工作之前等待其他線程完成,那麼你不需要生產者 - 消費者習慣用法;只要讓孩子們在不打電話TQ.join()的情況下完成他們的工作並退出,並且不要打擾父母的TQ.task_done()。 (請注意,您已經在與PQ正確地做這個。)

如果,另一方面,你希望他們並行工作,不要試圖join子線程,直到你完成你的循環。

+0

謝謝!這是一個非常完整的答案,但是現在我還有1個關於你的第二個答案的問題。 1)關於GIL,這是否意味着如果我產生了30個線程,它將與產卵1相同?因爲你說他們最終被序列化了...... – 2014-12-13 11:42:23

+0

@安吉洛已知:不,它不完全相同。你不會得到*並行性*。也就是說,即使你有32個內核,使用30個線程的運行速度也不會比使用1快。但是,你確實得到* concurrency *。線程在你的任務之間自動交錯工作,並且它們可以自動處理阻塞 - 例如,如果一個線程正在等待I/O,系統將安排一個不同的線程來運行,而不是阻塞整個程序。除非你編寫的代碼是_explicitly_死鎖(如你的例子),一個線程不會阻止另一個進程。 – abarnert 2014-12-17 23:47:25

+0

@AngeloUknown:我找不到一個很好的資源來討論Python特有術語的差異,但Haskell wiki上的[Parallelism vs. Concurrency](https://www.haskell.org/haskellwiki/Parallelism_vs._Concurrency)如果你忽略Haskell特定的東西,這是一個相當不錯的概述。 – abarnert 2014-12-17 23:48:33