3

我有,否則將被容易地鏈接在一起的命令行像這樣三個命令:如何在Python 2.7中線程化多個子流程實例?

$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput 

換句話說,所述firstCommand過程foo從標準輸入和管道將結果secondCommand,這反過來流程輸入並輸出到thirdCommand,該處理將其輸出處理並重定向到文件finalOutput

我一直試圖在Python腳本中使用線程來概括這一點。我希望使用Python來處理firstCommand之前的輸出,然後將它傳遞到secondCommand,再次在secondCommandthirdCommand之間。

這裏是代碼的摘錄,似乎並沒有工作:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) 
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) 
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout) 

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin)) 
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin)) 
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin)) 

first_thread.start() 
second_thread.start() 
third_thread.start() 

first_thread.join() 
second_thread.join() 
third_thread.join() 

first_process.communicate() 
second_process.communicate() 
third_process.communicate() 

# read 1K chunks from standard input 
def consumeOutputFromStdin(from_stream, to_stream): 
    chunk = from_stream.read(1024) 
    while chunk: 
     to_stream.write(chunk) 
     to_stream.flush() 
     chunk = from_stream.read(1024) 

def consumeOutputFromFirstCommand(from_stream, to_stream): 
    while True: 
     unprocessed_line = from_stream.readline() 
     if not unprocessed_line: 
      break 
     processed_line = some_python_function_that_processes_line(unprocessed_line) 
     to_stream.write(processed_line) 
     to_stream.flush() 

def consumeOutputFromSecondCommand(from_stream, to_stream): 
    while True: 
     unprocessed_line = from_stream.readline() 
     if not unprocessed_line: 
      break 
     processed_line = a_different_python_function_that_processes_line(unprocessed_line) 
     to_stream.write(processed_line) 
     to_stream.flush() 

當我運行此,該腳本掛起:

$ echo foo | ./myConversionScript.py 
** hangs here... ** 

如果我打Ctrl-C終止腳本,代碼卡在線上third_thread.join()

C-c C-c 
Traceback (most recent call last): 
    File "./myConversionScript.py", line 786, in <module> 
    sys.exit(main(*sys.argv)) 
    File "./myConversionScript.py", line 556, in main 
    third_thread.join() 
    File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join 
    self.__block.wait() 
    File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait 
    waiter.acquire() 
KeyboardInterrupt 

如果我不使用third_processthird_thread,而不是僅將數據從第一個線程的輸出傳遞到第二個線程的輸入,則不會掛起。

關於第三個線程的東西似乎會導致事情中斷,但我不知道爲什麼。

我認爲communicate()的要點是它會處理三個進程的I/O,所以我不知道爲什麼有一個I/O掛起。

如何獲得三個或更多的命令/進程一起工作,其中一個線程消耗另一個線程/進程的輸出?

UPDATE

好吧,我做了一些改動,似乎幫助的基礎上,這裏的一些意見,並在其他網站上。進程完成時爲wait(),並且在線程方法內,管道一旦線程處理完所有數據就可以了。我擔心的是,內存使用量將成爲大型數據集非常高,但至少一切正常:

first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) 
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) 
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout) 

first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin)) 
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin)) 
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin)) 

first_thread.start() 
second_thread.start() 
third_thread.start() 

first_thread.join() 
second_thread.join() 
third_thread.join() 

first_process.wait() 
second_process.wait() 
third_process.wait() 

# read 1K chunks from standard input 
def consumeOutputFromStdin(from_stream, to_stream): 
    chunk = from_stream.read(1024) 
    while chunk: 
     to_stream.write(chunk) 
     to_stream.flush() 
     chunk = from_stream.read(1024) 

def consumeOutputFromFirstCommand(from_stream, to_stream): 
    while True: 
     unprocessed_line = from_stream.readline() 
     if not unprocessed_line: 
      from_stream.close() 
      to_stream.close() 
      break 
     processed_line = some_python_function_that_processes_line(unprocessed_line) 
     to_stream.write(processed_line) 
     to_stream.flush() 

def consumeOutputFromSecondCommand(from_stream, to_stream): 
    while True: 
     unprocessed_line = from_stream.readline() 
     if not unprocessed_line: 
      from_stream.close() 
      to_stream.close() 
      break 
     processed_line = a_different_python_function_that_processes_line(unprocessed_line) 
     to_stream.write(processed_line) 
     to_stream.flush() 
+0

關聯:[如何使用subprocess.Popen通過管道連接多個進程?](http://stackoverflow.com/q/295459/4279) – jfs

+0

更多類似的示例:[使用POpen將變量發送到Stdin併發送Stdout到一個變量](http://stackoverflow.com/q/20789427/4279) – jfs

+0

更多:[Python子進程模塊,我如何給管道命令系列中的第一個輸入?](http:/ /stackoverflow.com/q/5080402/4279) – jfs

回答

2

爲了模擬:

echo foo | 
firstCommand - | somePythonRoutine - | 
secondCommand - | anotherPythonRoutine - | 
thirdCommand - > finalOutput 

您使用線程當前的方法的工作原理:

from subprocess import Popen, PIPE 

first = Popen(["firstCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1) 
second = Popen(["secondCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1) 
bind(first.stdout, second.stdin, somePythonRoutine) 
with open("finalOutput", "wb") as file: 
    third = Popen(["thirdCommand", "-"], stdin=PIPE, stdout=file, bufsize=1) 
bind(second.stdout, third.stdin, anotherPythonRoutine) 

# provide input for the pipeline 
first.stdin.write(b"foo") 
first.stdin.close() 

# wait for it to complete 
pipestatus = [p.wait() for p in [first, second, third]] 

其中每個bind()啓動一個新線程:

from threading import Thread 

def bind(input_pipe, output_pipe, line_filter): 
    def f(): 
     try: 
      for line in iter(input_pipe.readline, b''): 
       line = line_filter(line) 
       if line: 
        output_pipe.write(line) # no flush unless newline present 
     finally: 
      try: 
       output_pipe.close() 
      finally: 
       input_pipe.close() 
    t = Thread(target=f) 
    t.daemon = True # die if the program exits 
    t.start() 

somePythonRoutineanotherPythonRoutine接受單並返回它(可能修改)。

1

communicate()的一點是,它返回過程的輸出。這與您的管道設置相沖突。

您應該只在第三個過程中調用它一次;所有其他人都通過管道連接,並知道如何相互溝通 - 不需要其他/人工干預。

+0

感謝您的回答。不幸的是,你所建議的改變沒有效果。 –

+0

下一步是將日誌記錄添加到您的代碼中,以查看哪個線程實際正在工作以及它可能掛起的位置。 –