3
我有成千上萬的模擬運行在具有多個內核的系統上。目前,它是以串行方式完成的,我知道我的輸入參數,並將結果存儲在字典中。將串行任務轉換爲並行映射輸入和輸出
串行版本
import time
import random
class MyModel(object):
input = None
output = None
def run(self):
time.sleep(random.random()) # simulate a complex task
self.output = self.input * 10
# Run serial tasks and store results for each parameter
parameters = range(10)
results = {}
for p in parameters:
m = MyModel()
m.input = p
m.run()
results[p] = m.output
print('results: ' + str(results))
這需要< 10秒,並顯示正確的結果:
我嘗試並行這個過程是基於在該示例
results: {0: 0, 1: 10, 2: 20, 3: 30, 4: 40, 5: 50, 6: 60, 7: 70, 8: 80, 9: 90}
並行版本multiprocessing
模塊附近的文字"An example showing how to use queues to feed tasks to a collection of worker processes and collect the results"(抱歉,沒有可用的URL錨點)。
以下基礎上的串行版本的上半部分:
from multiprocessing import Process, Queue
NUMBER_OF_PROCESSES = 4
def worker(input, output):
for args in iter(input.get, 'STOP'):
m = MyModel()
m.input = args[0]
m.run()
output.put(m.output)
# Run parallel tasks and store results for each parameter
parameters = range(10)
results = {}
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
tasks = [(t,) for t in parameters]
for task in tasks:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get unordered results
for i in range(len(tasks)):
results[i] = done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
print('results: ' + str(results))
現在只需要幾秒鐘,但投入和結果之間的映射命令混淆在一起。
results: {0: 10, 1: 0, 2: 60, 3: 40, 4: 20, 5: 80, 6: 30, 7: 90, 8: 70, 9: 50}
我意識到,我基於一個無序done_queue.get()
填充results
,但我不知道怎麼去正確的映射到task_queue
。有任何想法嗎?任何其他方式來使這種方式更清潔?