我有,否則將被容易地鏈接在一起的命令行像這樣三個命令:如何在Python 2.7中線程化多個子流程實例?
$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput
換句話說,所述firstCommand
過程foo
從標準輸入和管道將結果secondCommand
,這反過來流程輸入並輸出到thirdCommand
,該處理將其輸出處理並重定向到文件finalOutput
。
我一直試圖在Python腳本中使用線程來概括這一點。我希望使用Python來處理firstCommand
之前的輸出,然後將它傳遞到secondCommand
,再次在secondCommand
和thirdCommand
之間。
這裏是代碼的摘錄,似乎並沒有工作:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.communicate()
second_process.communicate()
third_process.communicate()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
當我運行此,該腳本掛起:
$ echo foo | ./myConversionScript.py
** hangs here... **
如果我打Ctrl-C
終止腳本,代碼卡在線上third_thread.join()
:
C-c C-c
Traceback (most recent call last):
File "./myConversionScript.py", line 786, in <module>
sys.exit(main(*sys.argv))
File "./myConversionScript.py", line 556, in main
third_thread.join()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
self.__block.wait()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
waiter.acquire()
KeyboardInterrupt
如果我不使用third_process
和third_thread
,而不是僅將數據從第一個線程的輸出傳遞到第二個線程的輸入,則不會掛起。
關於第三個線程的東西似乎會導致事情中斷,但我不知道爲什麼。
我認爲communicate()
的要點是它會處理三個進程的I/O,所以我不知道爲什麼有一個I/O掛起。
如何獲得三個或更多的命令/進程一起工作,其中一個線程消耗另一個線程/進程的輸出?
UPDATE
好吧,我做了一些改動,似乎幫助的基礎上,這裏的一些意見,並在其他網站上。進程完成時爲wait()
,並且在線程方法內,管道一旦線程處理完所有數據就可以了。我擔心的是,內存使用量將成爲大型數據集非常高,但至少一切正常:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.wait()
second_process.wait()
third_process.wait()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
關聯:[如何使用subprocess.Popen通過管道連接多個進程?](http://stackoverflow.com/q/295459/4279) – jfs
更多類似的示例:[使用POpen將變量發送到Stdin併發送Stdout到一個變量](http://stackoverflow.com/q/20789427/4279) – jfs
更多:[Python子進程模塊,我如何給管道命令系列中的第一個輸入?](http:/ /stackoverflow.com/q/5080402/4279) – jfs