2013-06-19 32 views
14

是否有可能低於修改代碼從「標準輸出」和‘標準錯誤’具有打印輸出:Subprocess.Popen:克隆輸出和錯誤既終端和變量

  • 打印終端上(在實時間),
  • 最後存儲在errs變量?

代碼:

#!/usr/bin/python3 
# -*- coding: utf-8 -*- 

import subprocess 

def run_cmd(command, cwd=None): 
    p = subprocess.Popen(command, cwd=cwd, shell=False, 
         stdout=subprocess.PIPE, 
         stderr=subprocess.PIPE) 
    outs, errs = p.communicate() 
    rc = p.returncode 
    outs = outs.decode('utf-8') 
    errs = errs.decode('utf-8') 

    return (rc, (outs, errs)) 

感謝@unutbu,對於@ JF-塞巴斯蒂安特別感謝,最終的功能:

#!/usr/bin/python3 
# -*- coding: utf-8 -*- 


import sys 
from queue import Queue 
from subprocess import PIPE, Popen 
from threading import Thread 


def read_output(pipe, funcs): 
    for line in iter(pipe.readline, b''): 
     for func in funcs: 
      func(line.decode('utf-8')) 
    pipe.close() 


def write_output(get): 
    for line in iter(get, None): 
     sys.stdout.write(line) 


def run_cmd(command, cwd=None, passthrough=True): 
    outs, errs = None, None 

    proc = Popen(
     command, 
     cwd=cwd, 
     shell=False, 
     close_fds=True, 
     stdout=PIPE, 
     stderr=PIPE, 
     bufsize=1 
     ) 

    if passthrough: 

     outs, errs = [], [] 

     q = Queue() 

     stdout_thread = Thread(
      target=read_output, args=(proc.stdout, [q.put, outs.append]) 
      ) 

     stderr_thread = Thread(
      target=read_output, args=(proc.stderr, [q.put, errs.append]) 
      ) 

     writer_thread = Thread(
      target=write_output, args=(q.get,) 
      ) 

     for t in (stdout_thread, stderr_thread, writer_thread): 
      t.daemon = True 
      t.start() 

     proc.wait() 

     for t in (stdout_thread, stderr_thread): 
      t.join() 

     q.put(None) 

     outs = ' '.join(outs) 
     errs = ' '.join(errs) 

    else: 

     outs, errs = proc.communicate() 
     outs = '' if outs == None else outs.decode('utf-8') 
     errs = '' if errs == None else errs.decode('utf-8') 

    rc = proc.returncode 

    return (rc, (outs, errs)) 
+0

的代碼示例確實店'outs'和'錯誤'並返回它們......要打印到終端,只需簡單地「如果超出:打印輸出''如果錯誤:打印錯誤' – bnlucas

+2

@blulucas謝謝,但正如我在第一點所述:輸出應打印在實時到達終端,就像沒有PIPEing一樣。 –

+2

如果您需要Python 3代碼;添加[tag:python-3.x]標籤(我在shebang中看到python3)。你寫的代碼會讓閱讀線程掛起。在Python 3中''''是一個Unicode文字,但'pipe.readline()'默認返回字節(在Python 3上是'''!= b「」')。如果你修復它,那麼編寫器線程不會結束,因爲沒有把''「'放入隊列中。 – jfs

回答

14

你可以生成線程讀取輸出和錯誤管道,寫入一個通用隊列,並追加到列表中。然後使用第三個線程打印隊列中的項目。

import time 
import Queue 
import sys 
import threading 
import subprocess 
PIPE = subprocess.PIPE 


def read_output(pipe, funcs): 
    for line in iter(pipe.readline, ''): 
     for func in funcs: 
      func(line) 
      # time.sleep(1) 
    pipe.close() 

def write_output(get): 
    for line in iter(get, None): 
     sys.stdout.write(line) 

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1) 
q = Queue.Queue() 
out, err = [], [] 
tout = threading.Thread(
    target=read_output, args=(process.stdout, [q.put, out.append])) 
terr = threading.Thread(
    target=read_output, args=(process.stderr, [q.put, err.append])) 
twrite = threading.Thread(target=write_output, args=(q.get,)) 
for t in (tout, terr, twrite): 
    t.daemon = True 
    t.start() 
process.wait() 
for t in (tout, terr): 
    t.join() 
q.put(None) 
print(out) 
print(err) 

之所以使用第三線程 - 而不是第一兩個線程都直接打印又讓到終端 - 是爲了防止同時發生的兩個打印語句,這可導致有時會出現亂碼文本。


上述呼叫random_print.py,打印到輸出和錯誤隨機:

import sys 
import time 
import random 

for i in range(50): 
    f = random.choice([sys.stdout,sys.stderr]) 
    f.write(str(i)+'\n') 
    f.flush() 
    time.sleep(0.1) 

將該溶液借用代碼和理念J. F. Sebastian, here


下面是類Unix系統的替代解決方案,使用select.select

import collections 
import select 
import fcntl 
import os 
import time 
import Queue 
import sys 
import threading 
import subprocess 
PIPE = subprocess.PIPE 

def make_async(fd): 
    # https://stackoverflow.com/a/7730201/190597 
    '''add the O_NONBLOCK flag to a file descriptor''' 
    fcntl.fcntl(
     fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK) 

def read_async(fd): 
    # https://stackoverflow.com/a/7730201/190597 
    '''read some data from a file descriptor, ignoring EAGAIN errors''' 
    # time.sleep(1) 
    try: 
     return fd.read() 
    except IOError, e: 
     if e.errno != errno.EAGAIN: 
      raise e 
     else: 
      return '' 

def write_output(fds, outmap): 
    for fd in fds: 
     line = read_async(fd) 
     sys.stdout.write(line) 
     outmap[fd.fileno()].append(line) 

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True) 

make_async(process.stdout) 
make_async(process.stderr) 
outmap = collections.defaultdict(list) 
while True: 
    rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], []) 
    write_output(rlist, outmap) 
    if process.poll() is not None: 
     write_output([process.stdout, process.stderr], outmap) 
     break 

fileno = {'stdout': process.stdout.fileno(), 
      'stderr': process.stderr.fileno()} 

print(outmap[fileno['stdout']]) 
print(outmap[fileno['stderr']]) 

該解決方案使用代碼和理念Adam Rosenfield's post, here

+0

你可以在'process.wait()'後面加入'q.put(None)'並退出'None'上的第三個線程,例如'iner iter(get,None):'。另外'pipe.close()'丟失。 – jfs

+0

@ J.F.Sebastian:感謝您的更正。假設'read_output'出於某種原因無法跟上寫入'pipe'的輸出。 (我試着用'時間來模擬它。睡上(1)')。當'time.sleep(1)'被取消註釋時,'out'和'err'在'process.wait()'完成之前未能收集所有的輸出。你知道一種方法來保證'out'和'err'獲得所有輸出嗎? – unutbu

+0

't {err,out} .join()'在put(None)之前''。順便說一句,「實時」,「bufsize = 1」可能會幫助(忽略塊緩衝問題) – jfs

17

捕獲和顯示在同一時間標準輸出和標準錯誤在一個單獨的線程通過線子進程線,你可以使用異步I/O:

#!/usr/bin/env python3 
import asyncio 
import os 
import sys 
from asyncio.subprocess import PIPE 

@asyncio.coroutine 
def read_stream_and_display(stream, display): 
    """Read from stream line by line until EOF, display, and capture the lines. 

    """ 
    output = [] 
    while True: 
     line = yield from stream.readline() 
     if not line: 
      break 
     output.append(line) 
     display(line) # assume it doesn't block 
    return b''.join(output) 

@asyncio.coroutine 
def read_and_display(*cmd): 
    """Capture cmd's stdout, stderr while displaying them as they arrive 
    (line by line). 

    """ 
    # start process 
    process = yield from asyncio.create_subprocess_exec(*cmd, 
      stdout=PIPE, stderr=PIPE) 

    # read child's stdout/stderr concurrently (capture and display) 
    try: 
     stdout, stderr = yield from asyncio.gather(
      read_stream_and_display(process.stdout, sys.stdout.buffer.write), 
      read_stream_and_display(process.stderr, sys.stderr.buffer.write)) 
    except Exception: 
     process.kill() 
     raise 
    finally: 
     # wait for the process to exit 
     rc = yield from process.wait() 
    return rc, stdout, stderr 

# run the event loop 
if os.name == 'nt': 
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows 
    asyncio.set_event_loop(loop) 
else: 
    loop = asyncio.get_event_loop() 
rc, *output = loop.run_until_complete(read_and_display(*cmd)) 
loop.close() 
+0

這段代碼看起來不錯,你可以爲Python 2.7添加一個版本嗎? – kinORnirvana

+0

@kinORnirvana:'asyncio'只能用於Python 3.3+有'trollius' - 一個Python 2的克隆,但[已棄用](http://trollius.readthedocs.org/) – jfs

+0

不錯的工作,J.F!我只是「借」你的代碼[這個答案](http://stackoverflow.com/a/41284244/4014959)。如果您有任何意見,建議和/或更好的答案,他們將不勝感激。 –