2014-10-31 37 views
2

我想要創建一個函數,該函數在給定函數和相對參數列表的情況下,儘可能多地啓動進程以並行化這些任務。正在運行的進程數量不能超過我的CPU的核心數量。當一個過程完成時,它應該被另一個替換,直到結束。在多個CPU環境中並行處理大量函數

我試圖用python Pools來實現這樣的事情。這是我的功能:

from multiprocessing import Pool, cpu_count 

CPUS = cpu_count() 

def parallelize(functions, args): 
    results = [] 
    if CPUS > 1: 
     for i in xrange(0, len(functions), CPUS): 
      pool = Pool() 
      for j in xrange(CPUS): 
       if i + j >= len(functions): 
        break 
       results.append(pool.apply_async(functions[i + j], args = args[i + j])) 
      pool.close() 
      pool.join() 
     map(lambda x: x.get(), results) 
    else: 
     for i in xrange(len(functions)): 
      results.append(functions[i](*args[i])) 
    return results 

此實現細分批量函數列表。每個批量維度都等於實際CPU的數量。問題在於,它實際上一直等到每個批量的功能完成,然後再次啓動另一批量的過程。
我不想要這種行爲,因爲如果在批量中有一個非常慢的函數,其他cpus將在開始新進程之前等待它完成。

什麼是正確的方法?

+0

@dano'methods'是功能列表。所以'方法[i + j]'是一個函數。它有什麼問題? – ProGM 2014-10-31 14:55:32

+0

啊,對不起。我認爲這是一個切片,出於某種原因。忽略我:) – dano 2014-10-31 14:56:54

+0

好吧沒問題:) – ProGM 2014-10-31 14:57:35

回答

2

看起來你似乎太過複雜了。無論您給它多少工作項,multiprocessing.Pool將始終以您告訴它的進程數完全運行。因此,如果您創建Pool(CPUS),那麼Pool將永遠不會同時運行超過CPUS個任務,即使您爲其提供CPUS * 100任務。因此,如果沒有您做任何特殊工作,它可以滿足您的要求,即永遠不會運行比您擁有CPU更多的任務。因此,您可以遍歷整個方法和參數列表,並在其上調用apply_async,而不用擔心批量調用。該Pool將同時執行所有的任務,但也決不超過CPUS任務:

def parallelize(methods, args): 
    results = [] 
    if CPUS > 1: 
     pool = Pool(CPUS) 
     for method, arg in zip(methods, args): 
      results.append(pool.apply_async(method, args=arg)) 
     pool.close() 
     pool.join() 
     out = map(lambda x: x.get(), results) 
    else: 
     for i in xrange(len(methods)): 
      results.append(methods[i](*args[i])) 
    return results 
+0

哦,比我想象的簡單。謝謝! – ProGM 2014-10-31 15:04:36