2010-03-18 47 views
0

的讀標準輸出我有一個腳本(worker.py),該打印的形式無緩衝輸出...的Python:非阻塞從螺紋子

1 
2 
3 
. 
. 
. 
n 

其中n是迭代在該循環的一些恆定數腳本將使。在另一個腳本(service_controller.py)中,我啓動了多個線程,每個線程使用subprocess.Popen(stdout = subprocess.PIPE,...)啓動一個子進程;現在,在我的主線程(service_controller.py)中,我想讀取每個線程的worker.py子進程的輸出,並使用它計算剩餘時間直到完成的估計值。

我有所有的邏輯工作,從worker.py讀取stdout並確定最後打印的數字。問題是,我無法弄清楚如何以非阻塞的方式做到這一點。如果我讀取一個常量bufsize,那麼每次讀取都將最終等待來自每個工作人員的相同數據。我嘗試了很多方法,包括使用fcntl,select + os.read等。這裏我最好的選擇是什麼?如果需要,我可以發佈我的源代碼,但我認爲解釋很好地描述了問題。

感謝您的任何幫助。

編輯
添加示例代碼

我有一個啓動子工人。

class WorkerThread(threading.Thread): 
    def __init__(self): 
     self.completed = 0 
     self.process = None 
     self.lock = threading.RLock() 
     threading.Thread.__init__(self) 

    def run(self): 
     cmd = ["/path/to/script", "arg1", "arg2"] 
     self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, shell=False) 
     #flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL) 
     #fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK) 

    def get_completed(self): 
     self.lock.acquire(); 
     fd = select.select([self.process.stdout.fileno()], [], [], 5)[0] 
     if fd: 
      self.data += os.read(fd, 1) 
      try: 
       self.completed = int(self.data.split("\n")[-2]) 
      except IndexError: 
       pass 
     self.lock.release() 
     return self.completed 

然後我有一個ThreadManager。

class ThreadManager(): 
    def __init__(self): 
     self.pool = [] 
     self.running = [] 
     self.lock = threading.Lock() 

    def clean_pool(self, pool): 
     for worker in [x for x in pool is not x.isAlive()]: 
      worker.join() 
      pool.remove(worker) 
      del worker 
     return pool 

    def run(self, concurrent=5): 
     while len(self.running) + len(self.pool) > 0: 
      self.clean_pool(self.running) 
      n = min(max(concurrent - len(self.running), 0), len(self.pool)) 
      if n > 0: 
       for worker in self.pool[0:n]: 
        worker.start() 
       self.running.extend(self.pool[0:n]) 
       del self.pool[0:n] 
      time.sleep(.01) 
     for worker in self.running + self.pool: 
      worker.join() 

和一些代碼來運行它。

threadManager = ThreadManager() 
for i in xrange(0, 5): 
    threadManager.pool.append(WorkerThread()) 
threadManager.run() 

我已經刪除了其他代碼的日誌,希望能夠查明問題。

+0

你在Linux或其他Unix?如果是這樣,請選擇+ os.read 1字節應該可以正常工作 - 您能告訴我們您沿該行的代碼以及它給您帶來的錯誤或不當行爲嗎? –

+0

這實際上是用於開發的Windoze,它將在Fedora或OS X上進行生產。 – sberry

回答

2

而不是讓你的service_controller被I/O訪問阻塞,只有線程循環應該讀取它自己的受控進程輸出。

然後,您可以在控制進程的線程對象中擁有方法以獲取最後的輪詢輸出。

當然,不要忘記在這種情況下使用一些鎖定機制來保護緩衝區,該緩衝區將被線程用來填充它,以及控制器調用的方法來獲取它。

+0

我離你的建議遠嗎?我有線程對象控制進程得到最後的輪詢輸出... – sberry

+0

你的get_completed方法只會填充self.completed,我會建議將它重命名爲update_completed。然後添加一個get_completed方法返回self.completed,(添加一個threading.lock來保護對它的訪問)。 然後在您的線程管理器中,您可以定期調用您的工作人員的get_completed。 – dweeves

+0

get_completed方法實際上應該返回self.completed(我在重新輸入時忽略了它)。我在閱讀中加入了RLock,但我仍然遇到同樣的問題。 – sberry

1

您的方法WorkerThread.run()啓動子進程,然後立即終止。 Run()需要執行輪詢並更新WorkerThread.completed,直到子進程完成。