2014-03-06 147 views
1

我有一個python函數必須總共運行12次。我現在已經設置了使用多處理庫中的Pool來並行運行所有它們。通常,我一次運行6個,因爲該功能是CPU密集型的,並行運行12經常會導致程序崩潰。當我們每次做6個時,第二個6將不會開始,直到前6個過程完成。理想情況下,我們希望另一個(例如第7個)在第一批6中的一個完成後立即啓動 - 以便在有更多啓動時有6個運行。眼下代碼如下所示(它會被調用兩次,通過在一個列表中的第6個元素,然後在另一個第二個6:Python多處理工作者/隊列

from multiprocessing import Pool 

def start_pool(project_list): 

    pool = Pool(processes=6) 
    pool.map(run_assignments_parallel,project_list[0:6]) 

所以我一直在努力實現一個工人/隊列解決方案,遇到了一些問題,我有一個工人功能,看起來像這樣:

def worker(work_queue, done_queue): 
    try: 
     for proj in iter(work_queue.get, 'STOP'): 
      print proj 
      run_assignments_parallel(proj) 
      done_queue.put('finished ' + proj) 
    except Exception, e:   
     done_queue.put("%s failed on %s with: %s" % (current_process().name, proj,  e.message)) 
    return True 

和代碼來調用工人功能如下:

workers = 6 
work_queue = Queue() 
done_queue = Queue() 
processes = [] 
for project in project_list: 
    print project 
    work_queue.put(project) 
for w in xrange(workers):   
    p = Process(target=worker, args=(work_queue, done_queue)) 
    p.start() 
    processes.append(p) 
    work_queue.put('STOP') 
for p in processes: 
    p.join()  
    done_queue.put('STOP') 
for status in iter(done_queue.get, 'STOP'):   
    print status 

project_list只是對於需要在功能運行的12個項目的路徑列表「run_assignments_parallel。」

這是現在寫入的方式,函數獲取調用一次以上相同的過程(項目),我真的不能告訴發生了什麼事。這段代碼是基於我找到的一個例子,我非常確定循環結構是混亂的。任何幫助都會很棒,我對此表示無知。謝謝!

+0

好你,你會得到錯誤,請檢查所有 – Mark

+0

PXL固定的格式明顯但鑑於這是所有其他線正確 - 看跌期權(「SOP」)不使用Python同意docs example – Mark

回答

3

理想情況下,我們想另外一個(例如7日)到只要一揭開序幕從首批的6 finished-所以這6在一次運行時,有更多的開始。

所有你需要改變的是通過所有12個輸入參數,而不是6:

from multiprocessing import Pool 
pool = Pool(processes=6) # run no more than 6 at a time 
pool.map(run_assignments_parallel, project_list) # pass full list (12 items) 
+0

謝謝!這確實是正確的,我曾嘗試過,現在我再次意識到爲什麼我的問題稍微複雜一點 - 該函數使用來自某些專有軟件的API調用不能銷燬和重新創建的對象。換句話說,每個項目都必須在一個單獨的進程上運行,這就是爲什麼start_pool函數會被第二次調用6的原因。關於如何解決這個問題的任何想法? – user2503169

+0

@ user2503169:a)檢查該對象是否已經創建,例如,使其成爲一個全局對象,如果它不是'None',則不要嘗試重新創建它。b)或者如果您想爲每個任務創建一個新進程:[''maxtasksperchild = 1'](http://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool)c)創建12個'Process'並使用'Semaphore'來避免超過6次運行。 – jfs

+0

看起來像maxtaskperchild將是一個簡單的解決方案,但我運行2.6(由於專有軟件)。你能告訴我如何使用Semaphore?我非常感謝幫助! – user2503169

4

可以使用MPipe模塊。

創建一個6人工單級管線,並將所有項目的飼料作爲任務。然後,只需閱讀結果(在您的情況下,狀態)結束。

from mpipe import Pipeline, OrderedStage 

...  

pipe = Pipeline(OrderedStage(run_assignments_parallel), 6)  

for project in project_list: 
    pipe.put(project) 

pipe.put(None) # Signal end of input. 

for status in pipe.results(): 
    print(status) 
與indetation