2014-02-07 210 views
2

我試圖在Python中輸出一個冗長的命令輸出到標準輸出和標準錯誤。我想輪詢子進程並將輸出寫入單獨的文件。同時讀取子進程標準輸出和標準錯誤

我嘗試以下,在此基礎上回答Non-blocking read on a subprocess.PIPE in python

import subprocess 

from Queue import Queue, Empty 
from threading import Thread 

def send_cmd(cmd, shell=False): 
    """ 
    Send cmd to the shell 
    """ 
    if not isinstance(cmd, list): cmd = shlex.split(cmd) 

    params = {'args' : cmd, 
       'stdout' : subprocess.PIPE, 
       'stderr' : subprocess.PIPE, 
       'shell' : shell} 

    proc = subprocess.Popen(**params) 

    return proc 

def monitor_command(process, stdout_log=os.devnull, stderr_log=os.devnull): 
    """ 
    Monitor the process that is running, and log it if desired 
    """ 
    def enqueue_output(out, queue): 
     for line in iter(out.readline, b''): 
      queue.put(line) 

    def setup_process(log_name, proc): 
     FID = open(log_name, 'w') 
     queue = Queue() 
     thread = Thread(target=enqueue_output, args=(proc, queue)) 
     thread.daemon = True # Thread dies with program 
     thread.start() 

     return (queue, FID) 

    def check_queues(queue_list, errors): 
     for queue, FID in queue_list: 
      try: 
       line = queue.get_nowait() 
       if 'error' in line.lower() or 'failed' in line.lower(): 
        errors.append(line) 
      except Empty: 
       pass 
      else: 
       FID.write(line) 

    errors = [] 
    queue_list = [] 

    for log, proc in [(stdout_log, process.stdout), (stderr_log, process.stderr)]: 
     queue_list.append(setup_process(log, proc) 

    while process.poll() is None: 
     check_queues(queue_list, errors) 

    while not queue_list[0][0].empty() or queue_list[1][0].empty(): 
     check_queues(queue_list, errors) 

    for queue, FID in queue_list: 
     FID.close() 

return errors 

process = send_cmd('long_program.exe') 
errors = monitor_command(process, stdout_log='stdout.log', stderr_log='stderr.log') 

但標準輸出的輸出文件是空的,而對於錯誤中的輸出文件只有幾行代碼,而兩者都應該是相當大的。

我錯過了什麼?

回答

0

我也曾經..這裏是一些舊的代碼我寫

 


class Process_Communicator(): 

    def join(self): 
     self.te.join() 
     self.to.join() 
     self.running = False 
     self.aggregator.join() 
     self.ti.join() 

    def enqueue_in(self): 
     while self.running and self.p.stdin is not None: 
      while not self.stdin_queue.empty(): 
       s = self.stdin_queue.get() 
       self.p.stdin.write(str(s) + '\n\r') 
      pass 

    def enqueue_output(self): 
     if not self.p.stdout or self.p.stdout.closed: 
      return 
     out = self.p.stdout 
     for line in iter(out.readline, b''): 
      self.qo.put(line) 
     # out.flush() 

    def enqueue_err(self): 
     if not self.p.stderr or self.p.stderr.closed: 
      return 
     err = self.p.stderr 
     for line in iter(err.readline, b''): 
      self.qe.put(line) 

    def aggregate(self): 
     while (self.running): 
      self.update() 
     self.update() 

    def update(self): 
     line = "" 
     try: 
      while self.qe.not_empty: 
       line = self.qe.get_nowait() # or q.get(timeout=.1) 
       self.unbblocked_err += line 
     except Queue.Empty: 
      pass 

     line = "" 
     try: 
      while self.qo.not_empty: 
       line = self.qo.get_nowait() # or q.get(timeout=.1) 
       self.unbblocked_out += line 
     except Queue.Empty: 
      pass 

     while not self.stdin_queue.empty(): 
       s = self.stdin_queue.get() 
       self.p.stdin.write(str(s)) 

    def get_stdout(self, clear=True): 
     ret = self.unbblocked_out 
     if clear: 
      self.unbblocked_out = "" 
     return ret 

    def has_stdout(self): 
     ret = self.get_stdout(False) 
     if ret == '': 
      return None 
     else: 
      return ret 

    def get_stderr(self, clear=True): 
     ret = self.unbblocked_out 
     if clear: 
      self.unbblocked_out = "" 
     return ret 

    def has_stderr(self): 
     ret = self.get_stdout(False) 
     if ret == '': 
      return None 
     else: 
      return ret 

    def __init__(self, subp): 
     '''This is a simple class that collects and aggregates the 
     output from a subprocess so that you can more reliably use 
     the class without having to block for subprocess.communicate.''' 
     self.p = subp 
     self.unbblocked_out = "" 
     self.unbblocked_err = "" 
     self.running = True 
     self.qo = Queue.Queue() 
     self.to = threading.Thread(name="out_read", 
            target=self.enqueue_output, 
            args=()) 
     self.to.daemon = True # thread dies with the program 
     self.to.start() 

     self.qe = Queue.Queue() 
     self.te = threading.Thread(name="err_read", 
            target=self.enqueue_err, 
            args=()) 
     self.te.daemon = True # thread dies with the program 
     self.te.start() 

     self.stdin_queue = Queue.Queue() 
     self.aggregator = threading.Thread(name="aggregate", 
              target=self.aggregate, 
              args=()) 
     self.aggregator.daemon = True # thread dies with the program 
     self.aggregator.start() 
     pass 

你可能不需要整個的例子,但隨意剪切複製粘貼你所需要的。顯示我是如何進行線程處理也很重要。

0

該代碼看起來比任務要求更復雜。我不明白你爲什麼需要在這裏撥打process.poll()queue.get_nowait()。將子進程的stdout/stderr傳遞給多個接收器;您可以從teed_call() that accepts arbitrary file-like objects開始:您可以傳遞日誌文件和特殊文件類對象,這些對象在.write()方法中積累了errors

用最少的更改修復你的代碼;您應該在讀取器線程上調用.join()(即使process.poll()而不是None即,子進程已退出;可能有一些未決的輸出,加入讀取器的線程可確保讀取所有輸出)。

相關問題