2012-10-02 42 views
0

我有一個簡單的示例腳本構造,它定義了三個使用python中的multiprocessing的獨立進程。我的目標是讓一個父線程產生兩個較小的線程來收集和處理數據。Python - 啓動兩個進程無限期運行

目前,我的實現看起來是這樣的:

from Queue import Queue,Empty 
from multiprocessing import Process 
import time 
import hashlib 


class FillQueue(Process): 
    def __init__(self,q): 
     Process.__init__(self) 
     self.q = q 

    def run(self): 
     i = 0 
     while i is not 5: 
      print 'putting' 
      self.q.put('foo') 
      i+=1 
     self.q.put('|STOP|') 

class ConsumeQueue(Process): 
    def __init__(self,q): 
     Process.__init__(self) 
     self.q = q 

    def run(self): 
     print 'Consume' 
     while True: 
      try: 
       value = self.q.get(False) 
       print value 
       if value == '|STOP|': 
        print 'done' 
        break; 
      except Empty: 
       print 'Nothing to process atm' 

class Ripper(Process): 

    q = Queue() 

    def __init__(self): 
     self.fq = FillQueue(self.q) 
     self.cq = ConsumeQueue(self.q) 
     self.fq.daemon = True 
     self.cq.daemon = True 

    def run(self): 
     try: 
      self.fq.start() 
      self.cq.start() 
     except KeyboardInterrupt: 
      print 'exit' 

if __name__ == '__main__': 
    r = Ripper() 
    r.start() 

由於它運行目前,從CLI腳本輸出看起來是這樣的:

putting 
putting 
putting 
putting 
putting 
Consume 
foo 
foo 
foo 
foo 
foo 
|STOP| 
done 

很顯然,我開始的方式我兩個線程被阻塞,因爲消費者甚至不開始處理隊列中的項目,直到填充程序完成添加項目。

我應該如何重寫這個以使兩個線程立即開始而不是阻止,因此消費者只需傳遞到Empty除了塊,而沒有需要處理的工作,但是當它接收到停止消息時會完全退出?

編輯:錯字,有startrun方法混合起來

回答

3

試試這個:

from Queue import Empty 
from multiprocessing import Process, Queue 
import time 
import hashlib 


class FillQueue(object): 
    def __init__(self, q): 
     self.q = q 

    def run(self): 
     i = 0 
     while i < 5: 
      print 'putting' 
      self.q.put('foo %d' % i) 
      i+=1 
      time.sleep(.5) 
     self.q.put('|STOP|') 

class ConsumeQueue(object): 
    def __init__(self, q): 
     self.q = q 

    def run(self): 
     while True: 
      try: 
       value = self.q.get(False) 
       print value 
       if value == '|STOP|': 
        print 'done' 
        break; 
      except Empty: 
       print 'Nothing to process atm' 
       time.sleep(.2) 


if __name__ == '__main__': 
    q = Queue() 
    f = FillQueue(q) 
    c = ConsumeQueue(q) 

    p1 = Process(target=f.run) 
    p1.start() 

    p2 = Process(target=c.run) 
    p2.start() 

    p1.join() 
    p2.join() 
+0

這個工作,雖然我改變了對象回從'Process'繼承和使用的run方法,而不是直接。謝謝! – DeaconDesperado

+0

現在可能要遲到了,以幫助OP,但我只是想補充說,它幫助我在每次打印後添加'sys.stdout.flush()'。當我嘗試你的例子時,它只在最後打印。 – AxP

3

我覺得你的程序工作正常。 CPU在短時間內一次只處理一件事。但是,將所有東西放入隊列所需的時間非常短。所以沒有理由說填料不能在一段時間內完成。

如果您在填充程序中添加了一些延遲,我認爲您應該看到它實際上按照您的預期工作。

3

您似乎正在使用multiprocessing.Process啓動多個進程。

但是,您正在使用Queue.Queue,它只是線程安全的,並不旨在被多個進程使用。

shevek的答案也是有效的,但作爲開始,您應該用multiprocessing.Queue替換Queue.Queue。