2011-07-31 41 views
8

我試圖使用subprocess模塊在Python有一個過程,讀取標準輸入並以流方式寫到標準輸出進行通信。我想讓子進程從產生輸入的迭代器中讀取行,然後從子進程讀取輸出行。輸入和輸出線之間可能沒有一對一的對應關係。我如何從一個返回字符串的任意迭代器中提供一個子進程?如何從Python迭代器提供子進程的標準輸入?

下面是一些示例代碼,給出了一個簡單的測試案例,以及一些方法我都試過,由於某種原因或其他不工作:

#!/usr/bin/python 
from subprocess import * 
# A really big iterator 
input_iterator = ("hello %s\n" % x for x in xrange(100000000)) 

# I thought that stdin could be any iterable, but it actually wants a 
# filehandle, so this fails with an error. 
subproc = Popen("cat", stdin=input_iterator, stdout=PIPE) 

# This works, but it first sends *all* the input at once, then returns 
# *all* the output as a string, rather than giving me an iterator over 
# the output. This uses up all my memory, because the input is several 
# hundred million lines. 
subproc = Popen("cat", stdin=PIPE, stdout=PIPE) 
output, error = subproc.communicate("".join(input_iterator)) 
output_lines = output.split("\n") 

所以,我怎麼能有我的子進程從讀迭代器一行一行,而我從它的標準輸出逐行讀取?

回答

5

最簡單的方法似乎是叉子和從子進程飼料的輸入手柄。任何人都可以詳細說明這樣做的任何可能的缺點嗎?還是有python模塊,使它更容易和更安全?

#!/usr/bin/python 
from subprocess import * 
import os 

def fork_and_input(input, handle): 
    """Send input to handle in a child process.""" 
    # Make sure input is iterable before forking 
    input = iter(input) 
    if os.fork(): 
     # Parent 
     handle.close() 
    else: 
     # Child 
     try: 
      handle.writelines(input) 
      handle.close() 
     # An IOError here means some *other* part of the program 
     # crashed, so don't complain here. 
     except IOError: 
      pass 
     os._exit() 

# A really big iterator 
input_iterator = ("hello %s\n" % x for x in xrange(100000000)) 

subproc = Popen("cat", stdin=PIPE, stdout=PIPE) 
fork_and_input(input_iterator, subproc.stdin) 

for line in subproc.stdout: 
    print line, 
+1

如果用戶在子進程中使用'exit()',則會引發'SystemExit'。應該改用['os._exit(0)'](https://docs.python.org/2/library/os.html#os._exit) – hakanc

+1

[使用'Thread()'而不是'os.fork ()'](http://stackoverflow.com/a/32331150/4279)的可移植性和避免各種難以調試的問題。下面是'os.fork()'可能出現的問題的一個例子:[標準庫中的鎖應該在fork上清理](http://bugs.python.org/issue6721) – jfs

0

關注this recipe這是一個附加到子支持asyncronous I/O。儘管如此,這仍然要求你的子過程對每個輸入行或者一組行進行響應。

+1

我不能保證程序會爲每一行輸入產生輸出。事實上,它可能不會。 –

+0

對不起,我不是很確切:我的意思是說你的主進程應該能夠爲你的子進程提供足夠的輸入來產生輸出,讀取這個輸出,給子進程提供更多的輸入,等等。循環。如果是這種情況,我的鏈接指向的配方可能會幫助你。重點是你的子進程應該能夠在它看到所有的輸入之前開始產生輸出。 –

+0

嗯。我的流水線中可能有一個排序步驟(取決於選項),所以它可能不會在接收到所有輸入之前生成大部分輸出。 –

2

從一個Python迭代器養活一個進程的標準輸入:

#!/usr/bin/env python3 
from subprocess import Popen, PIPE 

with Popen("sink", stdin=PIPE, bufsize=-1) as process: 
    for chunk in input_iterator: 
     process.stdin.write(chunk) 

如果你想讀在同一時間,那麼你需要threads或async.io輸出:

#!/usr/bin/env python3 
import asyncio 
import sys 
from asyncio.subprocess import PIPE 
from contextlib import closing 

async def writelines(writer, lines): 
    # NOTE: can't use writer.writelines(lines) here because it tries to write 
    # all at once 
    with closing(writer): 
     for line in lines: 
      writer.write(line) 
      await writer.drain() 

async def main(): 
    input_iterator = (b"hello %d\n" % x for x in range(100000000)) 
    process = await asyncio.create_subprocess_exec("cat", stdin=PIPE, stdout=PIPE) 
    asyncio.ensure_future(writelines(process.stdin, input_iterator)) 
    async for line in process.stdout: 
     sys.stdout.buffer.write(line) 
    return await process.wait() 

if sys.platform == 'win32': 
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows 
    asyncio.set_event_loop(loop) 
else: 
    loop = asyncio.get_event_loop() 
with closing(loop): 
    sys.exit(loop.run_until_complete(main())) 
相關問題