2013-07-04 51 views
3

我有成千上萬的模擬運行在具有多個內核的系統上。目前,它是以串行方式完成的,我知道我的輸入參數,並將結果存儲在字典中。將串行任務轉換爲並行映射輸入和輸出

串行版本

import time 
import random 

class MyModel(object): 
    input = None 
    output = None 

    def run(self): 
     time.sleep(random.random()) # simulate a complex task 
     self.output = self.input * 10 


# Run serial tasks and store results for each parameter 

parameters = range(10) 
results = {} 

for p in parameters: 
    m = MyModel() 
    m.input = p 
    m.run() 
    results[p] = m.output 

print('results: ' + str(results)) 

這需要< 10秒,並顯示正確的結果:

我嘗試並行這個過程是基於在該示例

results: {0: 0, 1: 10, 2: 20, 3: 30, 4: 40, 5: 50, 6: 60, 7: 70, 8: 80, 9: 90} 

並行版本multiprocessing模塊附近的文字"An example showing how to use queues to feed tasks to a collection of worker processes and collect the results"(抱歉,沒有可用的URL錨點)。

以下基礎上的串行版本的上半部分:

from multiprocessing import Process, Queue 
NUMBER_OF_PROCESSES = 4 

def worker(input, output): 
    for args in iter(input.get, 'STOP'): 
     m = MyModel() 
     m.input = args[0] 
     m.run() 
     output.put(m.output) 


# Run parallel tasks and store results for each parameter 

parameters = range(10) 
results = {} 

# Create queues 
task_queue = Queue() 
done_queue = Queue() 

# Submit tasks 
tasks = [(t,) for t in parameters] 
for task in tasks: 
    task_queue.put(task) 

# Start worker processes 
for i in range(NUMBER_OF_PROCESSES): 
    Process(target=worker, args=(task_queue, done_queue)).start() 

# Get unordered results 
for i in range(len(tasks)): 
    results[i] = done_queue.get() 

# Tell child processes to stop 
for i in range(NUMBER_OF_PROCESSES): 
    task_queue.put('STOP') 

print('results: ' + str(results)) 

現在只需要幾秒鐘,但投入和結果之間的映射命令混淆在一起。

results: {0: 10, 1: 0, 2: 60, 3: 40, 4: 20, 5: 80, 6: 30, 7: 90, 8: 70, 9: 50} 

我意識到,我基於一個無序done_queue.get()填充results,但我不知道怎麼去正確的映射到task_queue。有任何想法嗎?任何其他方式來使這種方式更清潔?

回答

1

A-ha!工作人員需要嵌入某種ID,例如用於返回到輸出隊列的輸入參數,該輸入參數可用於識別返回的進程。這裏有必要的修改:

def worker(input, output): 
    for args in iter(input.get, 'STOP'): 
     m = MyModel() 
     m.input = args[0] 
     m.run() 
     # Return a tuple of an ID (the input parameter), and the model output 
     return_obj = (m.input, m.output) 
     output.put(return_obj) 

# Get unordered results 
for i in range(len(tasks)): 
    # Unravel output tuple, which has the input parameter 'p' used as an ID 
    p, result = done_queue.get() 
    results[p] = result 
相關問題