2012-01-27 104 views
26

我想在使用過程對象的Python中使用工作者池。每個工作人員(一個進程)都會進行一些初始化(需要花費很少的時間),通過一系列工作(理想情況下使用map())並返回一些內容。除此之外,沒有必要進行溝通。但是,我似乎無法弄清楚如何使用map()來使用我的工作人員的compute()函數。python池與工人進程

from multiprocessing import Pool, Process 

class Worker(Process): 
    def __init__(self): 
     print 'Worker started' 
     # do some initialization here 
     super(Worker, self).__init__() 

    def compute(self, data): 
     print 'Computing things!' 
     return data * data 

if __name__ == '__main__': 
    # This works fine 
    worker = Worker() 
    print worker.compute(3) 

    # workers get initialized fine 
    pool = Pool(processes = 4, 
       initializer = Worker) 
    data = range(10) 
    # How to use my worker pool? 
    result = pool.map(compute, data) 

是作業隊列去替代方式,或者我可以使用map()

+0

所有過程對象是有狀態。您可能希望從標題中刪除該單詞。也。 'compute'是Worker的一種方法。在這些例子中,它通常是一個完全獨立的功能。爲什麼不寫計算函數來簡單地包括初始化和處理? – 2012-01-27 19:48:48

+0

夠公平的,謝謝。初始化需要很長時間,所以我只想在每個工作進程中執行一次。 – Felix 2012-01-27 20:14:18

+0

你必須要強調「通過一系列工作」的部分問題。由於這並不明顯。 – 2012-01-27 20:19:45

回答

50

我建議你爲此使用一個隊列。

class Worker(Process): 
    def __init__(self, queue): 
     super(Worker, self).__init__() 
     self.queue= queue 

    def run(self): 
     print 'Worker started' 
     # do some initialization here 

     print 'Computing things!' 
     for data in iter(self.queue.get, None): 
      # Use data 

現在,你可以從一個單一的隊列

request_queue = Queue() 
for i in range(4): 
    Worker(request_queue).start() 
for data in the_real_source: 
    request_queue.put(data) 
# Sentinel objects to allow clean shutdown: 1 per worker. 
for i in range(4): 
    request_queue.put(None) 

這種事情應該讓你分攤在多個工人昂貴的啓動成本開始了一堆這些,都讓工作。

+0

這就是我想的,謝謝!我最終使用作業隊列(輸入)和結果隊列(輸出)來同步所有內容。 – Felix 2012-01-30 18:44:55

+0

你的例子真棒,我現在嘗試如何輸入哨兵對象時,沒有exepction按下strg + c – Dukeatcoding 2013-06-26 09:55:48

+0

@ S.Lott:是不是隊列不可pickle-able?這就是爲什麼你使用[multiprocessing.Manager().Queue](http://stackoverflow.com/questions/3217002/how-do-you-pass-a-queue-reference-to-a-function-managed-by -pool-MAP-異步)? – zuuz 2013-12-16 12:51:52

4

initializer需要一個可執行的可調用來執行初始化,例如,它可以設置一些全局變量,而不是一個Process子類; map接受一個任意的可迭代:

#!/usr/bin/env python 
import multiprocessing as mp 

def init(val): 
    print('do some initialization here') 

def compute(data): 
    print('Computing things!') 
    return data * data 

def produce_data(): 
    yield -100 
    for i in range(10): 
     yield i 
    yield 100 

if __name__=="__main__": 
    p = mp.Pool(initializer=init, initargs=('arg',)) 
    print(p.map(compute, produce_data()))