2014-08-28 47 views
0

我是多處理模塊的新手,想知道是否有可能產生一個可以與之通信的工作類。我有一個對象接收數據的實例,需要根據這些數據快速構建屬性,這些對象本身就是對象。以下代碼是一個簡單的情況:使用Python的多處理模塊來生成類屬性

class Worker(multiprocessing.Process): 

    def __init__(self, in_queue, out_queue): 
     multiprocessing.Process.__init__(self) 
     self.in_queue = in_queue 
     self.out_queue = out_queue 

    def run(self): 

     #Loop through the in_queue until a None is found 
     for tup in iter(self.in_queue.get,None): 

      #Try to manipulate the data 
      try: 
       self.out_queue.put(tup[0]*tup[1]) 
      except: 
       self.out_queue.put('Error') 
      self.in_queue.task_done() 

     #Remove the None from the in_queue 
     self.in_queue.task_done() 

class Master(): 

    def __init__(self,data): 

     #Initialize some data to be operated on 
     self.data=data 

    def gen_attributes(self): 

     #Initialize Queues to interact with the workers 
     in_queue=multiprocessing.JoinableQueue() 
     out_queue=multiprocessing.Queue() 

     #Create workers to operate on the data 
     for i in range(4): 
      Worker(in_queue,out_queue).start() 

     #Add data to the input queue 
     for tup in self.data: 
      in_queue.put(tup)   

     #Stop Condition for each worker 
     for _ in range(4): 
      in_queue.put(None) 
     in_queue.close() 

     #Wait for processes to finish 
     in_queue.join() 

     #Store the output as new attributes of the class 
     self.attributes=[] 
     for _ in range(0,out_queue.qsize()): 
      self.attributes.append(out_queue.get()) 

#Create and instance of the Master class, and have it generate it's own attributes using the multiprocessing module 
data=[(1,2),(2,2),(3,4)] 
M=Master(data) 
M.gen_attributes() 
print M.attributes 

實質上,Master類的實例是使用給定的數據生成的。主類然後將該數據傳遞給多個工作人員,以便在輸出隊列中進行操作和放置。主類然後使用該輸出來分配自己的屬性。

回答

0

這看起來像是一個完美的使用案例multiprocessing.Pool。您可能會注意到您的程序掛起並且Master沒有收到您期望的任何屬性。這是因爲它只是沒有收到任何信息,因爲它需要在out_queue上阻塞以便從隊列中讀取信息,因爲在程序從out_queue讀取時它是空的。

一個簡單的解決方法是阻止,並等待超時,隊列的get方法,像這樣:

while True: 
    attr = out_queue.get(True, 0.1) 
    if not attr: 
    break 
    self.attributes.append(attr) 

這是可行的,但它不是一個乾淨的解決方案。您可以對其進行修改,然後對其進行試驗以獲得所需的結果,我建議使用定點值。

+0

肖恩,謝謝你的答覆。這非常有趣。我認爲它掛在了in_queue.join()......即工人們沒有做好自己的工作。但是,如果它傳入in_queue.join(),這是不是意味着所有的工作都完成了,並且out_queue應該充滿結果?我也嘗試過檢查所有工作人員都加入了,並得到與上面相同的問題。 – Adam 2014-08-28 13:27:18

+0

肖恩,我也實現了你的解決方案,我似乎無法讓它工作。你有執行嗎? – Adam 2014-08-28 19:14:31

+0

我確實,但它在我的個人筆記本電腦上。我會在下班時嘗試發佈。 – sean 2014-08-28 19:18:18

相關問題