2010-06-19 94 views
14

我需要一種方法要麼讀取POPEN創造流目前所有的字符或找出多少個字符留在緩衝區中。如何從subprocess.Popen.stdout(非阻塞)讀取所有可用的數據?

底色: 我想遠程控制在Python的交互式應用程序。到目前爲止,我用POPEN創建一個新的子流程:

process=subprocess.Popen(["python"],shell=True,stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE, cwd=workingDir) 

(我不是真的開始Python,但實際的人機交互界面是類似的。) 在我讀了1個字節的那一刻,直到我發現這個過程已經達到了命令提示符:

output = "" 
while output[-6:]!="SCIP> ": 
    output += process.stdout.read(1) 
    sys.stdout.write(output[-1]) 
return output 

然後我通過process.stdin.write("command\n")啓動一個冗長的計算。 我的問題是,我無法檢查計算是否完成,因爲我無法檢查流中最後一個字符是否是提示符。 read()read(n)塊我的線程,直到它到達EOF,它永遠不會,因爲交互式程序不會結束,直到它被告知。以上述循環的方式尋找提示也不起作用,因爲提示只會在計算之後發生。

理想的解決方案,可以讓我從流中讀取所有可用的字符,並立即返回一個空字符串,如果沒有什麼閱讀。

+0

Might pexpect http://www.noah.org/wiki/Pexpect做你所需要的? – Mark 2010-06-19 17:45:51

+1

我已經研究過那個,而且可能。但如果可能的話,我想要一個無需外部模塊即可工作的解決方案。 – Perseids 2010-06-19 17:57:08

+1

我不確定你可以給:http://www.python.org/dev/peps/pep-3145/ – 2010-06-19 18:18:16

回答

3

我周圍戳發現這真的是很好的解決方案

Persistent python subprocess

避免了阻塞問題一起使用fcntl設置文件的子進程管道非阻塞模式,無需輔助線程或輪詢屬性需要。我可能會錯過一些東西,但它解決了我的交互式過程控制問題。

+5

fcntl只是Unix系統。在Windows上沒有工作==不是「Python」解決方案。 :) – ddotsenko 2012-06-20 18:29:15

+0

我有很少經驗與Windows編程,但這[鏈接](http://stackoverflow.com/questions/5309262/is-there-way-to-make-file-descriptor-non-blocking-in-windows )似乎表明在Windows中也有相當於非阻塞的I/O。 – 2014-03-19 15:32:13

+0

而這[鏈接](http://eduunix.ccut.edu.cn/index2/html/python/OReilly%20-%20Python%20Programming%20on%20Win32/pythonwin32_snode133.html)顯示瞭如何使用win32file Python模塊在重疊(Windows俚語非阻塞)模式下創建管道。 – 2014-03-19 15:37:39

0

我不認爲的ReadLine()將阻止你的進程。

line = process.stdout.readline() 

早些時候,我試圖用

for line in process.stdout: 
    print(line) 

,但似乎掛起,直到進程終止。

+0

不幸的是readline()確實阻止了這個過程。而且它不會讀取提示符,因爲最後沒有換行符。 – Perseids 2010-06-19 18:20:25

1

是不正確的是閱讀()塊,直到EOF - 直到它得到它需要足夠的數據阻塞 - 從另一面可能是一些數據保存在緩衝區(因爲你結束了它不只是刷新用新行打印)。

爲什麼不能在孩子打印像"### OVER ###\n"嘗試,然後stdout.flush(),然後對父方收集直到你看到OVER令牌,與POPEN的stdout的''.join(i for i in iter(process.stdout.readline, '### OVER ###\n'))

+0

這不會有幫助,因爲我想做相反的事情:不要等到它準備好了,但要知道它不是。閱讀提示後,它在那裏工作非常可靠。 (附註:我加載到子進程中的程序是一個預編譯的二進制文件,我使用交互式接口的原因主要是我想省去使用C++接口的麻煩,因此我無法更改輸出。) – Perseids 2010-06-20 01:03:48

11

增量解析說是不是一個真正的問題。只需將一個管道插入一個線程,並通過輸出擦洗,尋找分隔符。根據您的偏好,它可以將其傳送到另一個管道/類似文件中,或者將解析的「塊」以異步模式放在「堆棧」上。下面是異步的標準輸出的「分塊」的一個例子基於自定義分隔符:

import cStringIO 
import uuid 
import threading 
import os 

class InputStreamChunker(threading.Thread): 
    ''' 
    Threaded object/code that mediates reading output from a stream, 
    detects "separation markers" in the stream and spits out chunks 
    of original stream, split when ends of chunk are encountered. 

    Results are made available as a list of filled file-like objects 
    (your choice). Results are accessible either "asynchronously" 
    (you can poll at will for results in a non-blocking way) or 
    "synchronously" by exposing a "subscribe and wait" system based 
    on threading.Event flags. 

    Usage: 
    - instantiate this object 
    - give our input pipe as "stdout" to other subprocess and start it: 
     Popen(..., stdout = th.input, ...) 
    - (optional) subscribe to data_available event 
    - pull resulting file-like objects off .data 
     (if you are "messing" with .data from outside of the thread, 
     be curteous and wrap the thread-unsafe manipulations between: 
     obj.data_unoccupied.clear() 
     ... mess with .data 
     obj.data_unoccupied.set() 
     The thread will not touch obj.data for the duration and will 
     block reading.) 

    License: Public domain 
    Absolutely no warranty provided 
    ''' 
    def __init__(self, delimiter = None, outputObjConstructor = None): 
     ''' 
     delimiter - the string that will be considered a delimiter for the stream 
     outputObjConstructor - instanses of these will be attached to self.data array 
     (intantiator_pointer, args, kw) 
     ''' 
     super(InputStreamChunker,self).__init__() 

     self._data_available = threading.Event() 
     self._data_available.clear() # parent will .wait() on this for results. 
     self._data = [] 
     self._data_unoccupied = threading.Event() 
     self._data_unoccupied.set() # parent will set this to true when self.results is being changed from outside 
     self._r, self._w = os.pipe() # takes all inputs. self.input = public pipe in. 
     self._stop = False 
     if not delimiter: delimiter = str(uuid.uuid1()) 
     self._stream_delimiter = [l for l in delimiter] 
     self._stream_roll_back_len = (len(delimiter)-1) * -1 
     if not outputObjConstructor: 
      self._obj = (cStringIO.StringIO,(), {}) 
     else: 
      self._obj = outputObjConstructor 
    @property 
    def data_available(self): 
     '''returns a threading.Event instance pointer that is 
     True (and non-blocking to .wait()) when we attached a 
     new IO obj to the .data array. 
     Code consuming the array may decide to set it back to False 
     if it's done with all chunks and wants to be blocked on .wait()''' 
     return self._data_available 
    @property 
    def data_unoccupied(self): 
     '''returns a threading.Event instance pointer that is normally 
     True (and non-blocking to .wait()) Set it to False with .clear() 
     before you start non-thread-safe manipulations (changing) .data 
     array. Set it back to True with .set() when you are done''' 
     return self._data_unoccupied 
    @property 
    def data(self): 
     '''returns a list of input chunkes (file-like objects) captured 
     so far. This is a "stack" of sorts. Code consuming the chunks 
     would be responsible for disposing of the file-like objects. 
     By default, the file-like objects are instances of cStringIO''' 
     return self._data 
    @property 
    def input(self): 
     '''This is a file descriptor (not a file-like). 
     It's the input end of our pipe which you give to other process 
     to be used as stdout pipe for that process''' 
     return self._w 
    def flush(self): 
     '''Normally a read on a pipe is blocking. 
     To get things moving (make the subprocess yield the buffer, 
     we inject our chunk delimiter into self.input 

     This is useful when primary subprocess does not write anything 
     to our in pipe, but we need to make internal pipe reader let go 
     of the pipe and move on with things. 
     ''' 
     os.write(self._w, ''.join(self._stream_delimiter)) 
    def stop(self): 
     self._stop = True 
     self.flush() # reader has its teeth on the pipe. This makes it let go for for a sec. 
     os.close(self._w) 
     self._data_available.set() 
    def __del__(self): 
     try: 
      self.stop() 
     except: 
      pass 
     try: 
      del self._w 
      del self._r 
      del self._data 
     except: 
      pass 
    def run(self): 
     ''' Plan: 
     - We read into a fresh instance of IO obj until marker encountered. 
     - When marker is detected, we attach that IO obj to "results" array 
      and signal the calling code (through threading.Event flag) that 
      results are available 
     - repeat until .stop() was called on the thread. 
     ''' 
     marker = ['' for l in self._stream_delimiter] # '' is there on purpose 
     tf = self._obj[0](*self._obj[1], **self._obj[2]) 
     while not self._stop: 
      l = os.read(self._r, 1) 
      print('Thread talking: Ordinal of char is:%s' %ord(l)) 
      trash_str = marker.pop(0) 
      marker.append(l) 
      if marker != self._stream_delimiter: 
       tf.write(l) 
      else: 
       # chopping off the marker first 
       tf.seek(self._stream_roll_back_len, 2) 
       tf.truncate() 
       tf.seek(0) 
       self._data_unoccupied.wait(5) # seriously, how much time is needed to get your items off the stack? 
       self._data.append(tf) 
       self._data_available.set() 
       tf = self._obj[0](*self._obj[1], **self._obj[2]) 
     os.close(self._r) 
     tf.close() 
     del tf 

def waitforresults(ch, answers, expect): 
    while len(answers) < expect: 
     ch.data_available.wait(0.5); ch.data_unoccupied.clear() 
     while ch.data: 
      answers.append(ch.data.pop(0)) 
     ch.data_available.clear(); ch.data_unoccupied.set() 
     print('Main talking: %s answers received, expecting %s\n' % (len(answers), expect)) 

def test(): 
    ''' 
    - set up chunker 
    - set up Popen with chunker's output stream 
    - push some data into proc.stdin 
    - get results 
    - cleanup 
    ''' 

    import subprocess 

    ch = InputStreamChunker('\n') 
    ch.daemon = True 
    ch.start() 

    print('starting the subprocess\n') 
    p = subprocess.Popen(
     ['cat'], 
     stdin = subprocess.PIPE, 
     stdout = ch.input, 
     stderr = subprocess.PIPE) 

    answers = [] 

    i = p.stdin 
    i.write('line1 qwer\n') # will be in results 
    i.write('line2 qwer\n') # will be in results 
    i.write('line3 zxcv asdf') # will be in results only after a ch.flush(), 
           # prepended to other line or when the pipe is closed 
    waitforresults(ch, answers, expect = 2) 

    i.write('line4 tyui\n') # will be in results 
    i.write('line5 hjkl\n') # will be in results 
    i.write('line6 mnbv') # will be in results only after a ch.flush(), 
           # prepended to other line or when the pipe is closed 
    waitforresults(ch, answers, expect = 4) 

    ## now we will flush the rest of input (that last line did not have a delimiter) 
    i.close() 
    ch.flush() 
    waitforresults(ch, answers, expect = 5) 

    should_be = ['line1 qwer', 'line2 qwer', 
     'line3 zxcv asdfline4 tyui', 'line5 hjkl', 'line6 mnbv'] 
    assert should_be == [i.read() for i in answers] 

    # don't forget to stop the chunker. It it closes the pipes 
    p.terminate() 
    ch.stop() 
    del p, ch 

if __name__ == '__main__': 
    test() 

編輯:刪除有關錯誤的空話「寫PROC的標準輸入是一個一次性的事情」

+0

我相信我喜歡你的版本比我實際執行的更好(參見Brian的答案)。儘管您一次只能讀取一個字符,但多線程方法更清晰。但我不同意你對輸入流的結論。至少如果你刷新它並讓進程有足夠的時間來讀取數據,它的結果將通過輸出流返回給你。以http://codepad.org/Yu7SoORS行172-188爲例(我已經用cat替換了grep)。否則,我所需要的程序將會失敗。 – Perseids 2010-06-21 00:51:13

+0

@蛋糕非常酷。我的立場糾正了我對匆忙繪製的關於subprocess.Popen的STDIN的假設。將調整代碼示例 – ddotsenko 2010-06-21 01:41:22

2

還有另一種可能解決方案,但它可能需要您重新安排您的程序一點。

如果您有多個I/O源(文件描述符,套接字等),並且想要一次全部等待,請使用Python select模塊。您可以(例如)將標準輸入(用於從終端讀取)和管道(來自子流程)放入列表中,並等待輸入在其中任何一個上準備就緒。select阻塞,直到列表中的任何描述符上都有I/O可用。然後,您掃描該列表,查找具有可用數據的列表。

這種方法相當有效 - 比查詢文件描述符以查看是否有任何數據更重要。它也有簡單的優點;也就是說,你可以用最少的代碼完成你想要的任務。代碼越簡單,意味着錯誤發生的機會越少。

+0

我使用選擇模塊中的poll進行工作。謝謝。 我的readAllAvailableData()現在看起來像這樣:http://codepad.org/ArYdEc3s。執行效率根本不高,但對於我的目的而言足夠快。我想最優雅的解決方案是使用Pexpect,正如Mark建議的(如果您可以使用外部模塊)。 – Perseids 2010-06-20 23:03:26

1

我已經嘗試了很多方法,如通過以下進行非阻塞stdout

fd = output.fileno() 
fl = fcntl.fcntl(fd, fcntl.F_GETFL) 
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) 

但唯一可行的解​​決方案描述here

master, slave = pty.openpty() 

proc = subprocess.Popen(
    shlex.split(command), 
    stdout=slave, 
    stderr=slave, 
    close_fds=True, 
    bufsize=0 
) 

stdout = os.fdopen(master) 

然後:

while True: 
    out = stdout.readline() 
    output_result = proc.poll() 
    if out == '' and output_result is not None: 
     break 
    if out != '': 
     print(out)