2015-05-23 179 views
1

我正在處理我的代碼。代碼花費很長時間才能在單個CPU上完成。
所以,我在想如果有可能讓代碼進行並行處理。
碼幀像:並行處理python代碼

def analyze_data(target_path): 
    import os 
    import math 
    import itertools 
    import numpy 
    import scipy 
    .... 
    for files in target_path: 
     <a real long series of calculations...... 
     ...................> 

    return 
#Providing the dir search path: 
dir_path = "/usr/target_dir/" 
analyze_data(target_path=dir_path) 

此代碼正在這樣的方式長到結束(文件數明顯工藝是巨大的)。
現在有什麼辦法可以在多處理線程中執行這種簡單的編碼格式以使其運行得更快?

謝謝。

回答

0

見(用於python3)的文檔:https://docs.python.org/3.4/library/multiprocessing.html

如果你可以分割你的目錄來進行處理:

from multiprocessing import Pool 

def analyze_data(target_path): 
    import os 
    import math 
    import itertools 
    import numpy 
    import scipy 
    .... 
    for files in target_path: 
     <a real long series of calculations...... 
     ...................> 

    return 
#Providing the dir search path: 

analyze_data(target_path=dir_path) 

if __name__ == '__main__': 
    with Pool(5) as p: 
     dir_path1 = "/usr/target_dir/1" 
     dir_path2 = "/usr/target_dir/2" 
     dir_path3 = "/usr/target_dir/3" 
     print(p.map(analyze_data, [dir_path1, dir_path2, dir_path3])) 
+0

還是「M沒有得到......」怎麼每個文件(目錄擁有這些文件的40/50),大約需要2小時才能完成。所以我正在尋找一種有效的方法來解決這個問題... – diffracteD

+0

'print(p.map(analyze_data,[dir_path1,dir_path2,dir_path3]))'這行是否將每個目錄傳遞給單個線程?我可能需要將每個文件分發到多個線程以使其更快。 – diffracteD

+0

我很確定該行將命令分配給多進程,但體驗python用戶可能會確認這一點。 – maggick

0

使用的multiprocessing叉叫pathos.multiprocessing,這可真方便......和可以很自然地從翻譯中完成。我還將利用pox,它具有一些除ossys模塊之外的文件系統實用程序。首先查看我設置的測試文件。每個目錄中有幾個文件。

>>> import os 
>>> os.path.abspath('.') 
'/tmp' 
>>> import pox 
>>> # find all the .txt files in and below the current directory 
>>> pox.find('*.txt', '.') 
['/tmp/xxx/1.txt', 'tmp/xxx/2.txt', 'tmp/xxx/3.txt', 'tmp/yyy/1.txt', 'tmp/yyy/2.txt', 'tmp/zzz/1.txt', 'tmp/zzz/2.txt', 'tmp/zzz/3.txt', 'tmp/zzz/4.txt'] 
>>> # let's look at the contents of one of the files 
>>> print open('xxx/1.txt', 'r').read() 
45125123412 
12341234123 
12342134234 
23421342134 

所有的文件都有類似的內容......所以讓我們開始並行處理文件。

>>> import time 
>>> import pathos 
>>> # build a thread pool of workers 
>>> thPool = pathos.multiprocessing.ThreadingPool 
>>> tp = thPool() 
>>> 
>>> # expensive per-file processing 
>>> def doit(file): 
...  with open(file, 'r') as f: 
...   x = sum(int(i) for i in f.readlines()) 
...  time.sleep(1) # make it 'expensive' 
...  return len(str(x))**2 # some calculation 
... 
>>> # grab all files from a directory, then do some final 'analysis' 
>>> def analyze_data(target_path): 
...  return min(*tp.uimap(doit, pox.find('*.txt', target_path))) 
... 
>>> analyze_data('.') 
121 

其實,analyze_data有些無關緊要,因爲find並不需要在每個目錄的基礎工作......但是這是在問題中指定的結構。在這裏,您將用昂貴的每文件任務替換大多數doit,並將用每個目錄處理替換min。根據計算的花費多少,您可能需要使用pathos.multiprocessing.ProcessingPool而不是ThreadingPool - 前者將產生多個進程,而後者只產生多個線程。前者有更多的開銷,但可以更好地並行處理更昂貴的任務。在這裏,我們使用uimap來爲每個文件調用doit提供一個無序迭代器。

獲取pathospox這裏:https://github.com/uqfoundation