2017-06-11 74 views
1

我試圖運行一些並行的Python函數,它在整個函數中都有打印命令。我想要的是讓每個子進程運行相同的功能,以分組方式輸出到主stdout。我的意思是,我希望每個子過程的輸出僅在完成其任務後才被打印。但是,如果在此過程中發生某種錯誤,我仍然要輸出子過程中所做的任何事情。有訂購打印的Python多處理子流程?

一個小例子:

from time import sleep 
import multiprocessing as mp 


def foo(x): 
    print('foo') 
    for i in range(5): 
     print('Process {}: in foo {}'.format(x, i)) 
     sleep(0.5) 


if __name__ == '__main__': 
    pool = mp.Pool() 

    jobs = [] 
    for i in range(4): 
     job = pool.apply_async(foo, args=[i]) 
     jobs.append(job) 

    for job in jobs: 
     job.wait() 

此並行運行,但什麼是輸出是:

foo 
Process 0: in foo 0 
foo 
Process 1: in foo 0 
foo 
Process 2: in foo 0 
foo 
Process 3: in foo 0 
Process 1: in foo 1 
Process 0: in foo 1 
Process 2: in foo 1 
Process 3: in foo 1 
Process 1: in foo 2 
Process 0: in foo 2 
Process 2: in foo 2 
Process 3: in foo 2 
Process 1: in foo 3 
Process 0: in foo 3 
Process 3: in foo 3 
Process 2: in foo 3 
Process 1: in foo 4 
Process 0: in foo 4 
Process 3: in foo 4 
Process 2: in foo 4 

我要的是:

foo 
Process 3: in foo 0 
Process 3: in foo 1 
Process 3: in foo 2 
Process 3: in foo 3 
Process 3: in foo 4 
foo 
Process 1: in foo 0 
Process 1: in foo 1 
Process 1: in foo 2 
Process 1: in foo 3 
Process 1: in foo 4 
foo 
Process 0: in foo 0 
Process 0: in foo 1 
Process 0: in foo 2 
Process 0: in foo 3 
Process 0: in foo 4 
foo 
Process 2: in foo 0 
Process 2: in foo 1 
Process 2: in foo 2 
Process 2: in foo 3 
Process 2: in foo 4 

沒關係只要每個子過程的每個輸出組合在一起,就可以執行任一過程的特定順序。有趣的是,我得到我想要的輸出,如果我做

python test.py > output 

我知道,每個子沒有得到自己的標準輸出,而不是他們使用的主要標準輸出。我想過並且想出了一些解決方案,比如讓我們使用Queue,並且每個子進程都有自己的stdout,然後當它完成時,我們重寫flush命令,以便我們可以輸出回來到隊列。之後,我們可以閱讀內容。但是,雖然這確實滿足我的要求,但如果函數中途停止,則無法檢索輸出。只有在成功完成後纔會輸出。從這裏可以得到它Access standard output of a sub process in python

我也看到了鎖的用法,但它完全殺死了並行運行的函數,因爲它必須等待每個子進程執行函數foo。另外,如果可能的話,我想避免更改我的foo函數的實現,因爲我有許多函數需要更改。

編輯:我已經看過圖書館dispy和平行python。 Dispy完全符合我的要求,它有一個單獨的stdout/stderr,我可以在最後打印出來,但問題在於我必須在單獨的終端中手動運行服務器。我希望能夠一次運行我的Python程序,而無需先打開另一個腳本。另一方面,並​​行Python也做我想要的東西,但它似乎缺乏對它的控制,還有一些煩人的麻煩。特別是,當您打印輸出時,它也會打印出函數的返回類型,我只想使用打印輸出。另外,在運行一個函數時,你必須給它一個它使用的模塊列表,這有點煩人,因爲我不想爲了運行一個簡單的函數而擁有一大串導入列表。

回答

3

正如你已經注意到的,在這種情況下使用鎖會殺死多處理,因爲你基本上已經擁有所有進程等待當前持有STDOUT'權限'的進程的互斥量釋放。但是,並行運行並與您的函數/子進程同步打印在邏輯上是獨佔的。

你可以做的是讓你的主進程作爲你的子進程的'打印機',這樣一旦你的子進程完成/錯誤,然後只有它發回你的主進程打印什麼。你似乎完全滿意印刷不是'實時'(無論如何,正如前面提到的那樣),所以這種方法應該爲你提供恰到好處的服務。所以:

import multiprocessing as mp 
import random # just to add some randomness 
from time import sleep 

def foo(x): 
    output = ["[Process {}]: foo:".format(x)] 
    for i in range(5): 
     output.append('[Process {}] in foo {}'.format(x, i)) 
     sleep(0.2 + 1 * random.random()) 
    return "\n".join(output) 

if __name__ == '__main__': 
    pool = mp.Pool(4) 
    for res in pool.imap_unordered(foo, range(4)): 
     print("[MAIN]: Process finished, response:") 
     print(res) # this will print as soon as one of the processes finishes/errors 
    pool.close() 

,這將給你(因人而異,當然):

[MAIN]: Process finished, response: 
[Process 2]: foo: 
[Process 2] in foo 0 
[Process 2] in foo 1 
[Process 2] in foo 2 
[Process 2] in foo 3 
[Process 2] in foo 4 
[MAIN]: Process finished, response: 
[Process 0]: foo: 
[Process 0] in foo 0 
[Process 0] in foo 1 
[Process 0] in foo 2 
[Process 0] in foo 3 
[Process 0] in foo 4 
[MAIN]: Process finished, response: 
[Process 1]: foo: 
[Process 1] in foo 0 
[Process 1] in foo 1 
[Process 1] in foo 2 
[Process 1] in foo 3 
[Process 1] in foo 4 
[MAIN]: Process finished, response: 
[Process 3]: foo: 
[Process 3] in foo 0 
[Process 3] in foo 1 
[Process 3] in foo 2 
[Process 3] in foo 3 
[Process 3] in foo 4 

可以觀察到其他任何東西,包括錯誤的方法相同。

UPDATE - 如果你絕對必須使用其輸出你無法控制的功能,你可以用你的子流程和捕捉他們的標準輸出/ STDERR代替,然後一旦他們這樣做(或拋出異常),你可以回到一切回到過程'經理'打印到實際的標準輸出。有了這樣的設置,我們可以有foo()喜歡:

def foo(x): 
    print("[Process {}]: foo:".format(x)) 
    for i in range(5): 
     print('[Process {}] in foo {}'.format(x, i)) 
     sleep(0.2 + 1 * random.random()) 
     if random.random() < 0.0625: # let's add a 1/4 chance to err: 
      raise Exception("[Process {}] A random exception is random!".format(x)) 
    return random.random() * 100 # just a random response, you can omit it 

請注意,這是意識不到的東西想惹它的操作方式。然後,我們將創建一個外部通用包裝器(因此您不必根據函數對其進行更改)實際上是混亂及其默認行爲(而不僅僅是這個函數,而是與運行時可能調用的所有內容):

def std_wrapper(args): 
    try: 
     from StringIO import StringIO # ... for Python 2.x compatibility 
    except ImportError: 
     from io import StringIO 
    import sys 
    sys.stdout, sys.stderr = StringIO(), StringIO() # replace stdout/err with our buffers 
    # args is a list packed as: [0] process function name; [1] args; [2] kwargs; lets unpack: 
    process_name = args[0] 
    process_args = args[1] if len(args) > 1 else [] 
    process_kwargs = args[2] if len(args) > 2 else {} 
    # get our method from its name, assuming global namespace of the current module/script 
    process = globals()[process_name] 
    response = None # in case a call fails 
    try: 
     response = process(*process_args, **process_kwargs) # call our process function 
    except Exception as e: # too broad but good enough as an example 
     print(e) 
    # rewind our buffers: 
    sys.stdout.seek(0) 
    sys.stderr.seek(0) 
    # return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE 
    return sys.stdout.read(), sys.stderr.read(), response 

現在我們要做的就是把這種包裝,而不是期望foo(),併爲其提供信息,什麼代表我們打電話:如果你運行它,所以現在

if __name__ == '__main__': 
    pool = mp.Pool(4) 
    # since we're wrapping the process we're calling, we need to send to the wrapper packed 
    # data with instructions on what to call on our behalf. 
    # info on args packing available in the std_wrapper function above. 
    for out, err, res in pool.imap_unordered(std_wrapper, [("foo", [i]) for i in range(4)]): 
     print("[MAIN]: Process finished, response: {}, STDOUT:".format(res)) 
     print(out.rstrip()) # remove the trailing space for niceness, print err if you want 
    pool.close() 

,你會得到這樣的東西:

[MAIN]: Process finished, response: None, STDOUT: 
[Process 2]: foo: 
[Process 2] in foo 0 
[Process 2] in foo 1 
[Process 2] A random exception is random! 
[MAIN]: Process finished, response: 87.9658471743586, STDOUT: 
[Process 1]: foo: 
[Process 1] in foo 0 
[Process 1] in foo 1 
[Process 1] in foo 2 
[Process 1] in foo 3 
[Process 1] in foo 4 
[MAIN]: Process finished, response: 38.929554421661194, STDOUT: 
[Process 3]: foo: 
[Process 3] in foo 0 
[Process 3] in foo 1 
[Process 3] in foo 2 
[Process 3] in foo 3 
[Process 3] in foo 4 
[MAIN]: Process finished, response: None, STDOUT: 
[Process 0]: foo: 
[Process 0] in foo 0 
[Process 0] in foo 1 
[Process 0] in foo 2 
[Process 0] in foo 3 
[Process 0] in foo 4 
[Process 0] A random exception is random! 

儘管foo()只是打印或錯誤。當然,你可以使用這樣的包裝來調用任何函數,並將任意數量的args/kwargs傳遞給它。

UPDATE#2 - 但是等等!如果我們可以像這樣包裝我們的函數進程,並且捕獲它們的STDOUT/STDERR,我們當然可以把它變成一個裝飾器,並在我們的代碼中使用它來進行簡單的裝飾。所以,我最後提議:

import functools 
import multiprocessing 
import random # just to add some randomness 
import time 

def std_wrapper(func): 
    @functools.wraps(func) # we need this to unravel the target function name 
    def caller(*args, **kwargs): # and now for the wrapper, nothing new here 
     try: 
      from StringIO import StringIO # ... for Python 2.x compatibility 
     except ImportError: 
      from io import StringIO 
     import sys 
     sys.stdout, sys.stderr = StringIO(), StringIO() # use our buffers instead 
     response = None # in case a call fails 
     try: 
      response = func(*args, **kwargs) # call our wrapped process function 
     except Exception as e: # too broad but good enough as an example 
      print(e) # NOTE: the exception is also printed to the captured STDOUT 
     # rewind our buffers: 
     sys.stdout.seek(0) 
     sys.stderr.seek(0) 
     # return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE 
     return sys.stdout.read(), sys.stderr.read(), response 
    return caller 

@std_wrapper # decorate any function, it won't know you're siphoning its STDOUT/STDERR 
def foo(x): 
    print("[Process {}]: foo:".format(x)) 
    for i in range(5): 
     print('[Process {}] in foo {}'.format(x, i)) 
     time.sleep(0.2 + 1 * random.random()) 
     if random.random() < 0.0625: # let's add a 1/4 chance to err: 
      raise Exception("[Process {}] A random exception is random!".format(x)) 
    return random.random() * 100 # just a random response, you can omit it 

現在我們可以像以前一樣撥打我們的包裹功能,無需處理參數包裝或諸如此類的事,所以我們回來:

if __name__ == '__main__': 
    pool = multiprocessing.Pool(4) 
    for out, err, res in pool.imap_unordered(foo, range(4)): 
     print("[MAIN]: Process finished, response: {}, STDOUT:".format(res)) 
     print(out.rstrip()) # remove the trailing space for niceness, print err if you want 
    pool.close() 

的輸出與前面的例子相同,但是具有更好的可管理性。

+0

這絕對有效,但可以避免更改foo嗎?特別是,我想繼續使用打印。這可能是不可能的,所以我可能不得不吮吸它,只是改變我的實現。另外,如果子進程在運行該函數的過程中失敗,是不是沒有輸出?據我所知,它只會在日誌達到結尾時才輸出。我希望能夠打印出消息的累積情況,而不管它是否已成功運行該功能或未能完成一半。 – mepmerp

+0

@mepmerp - 檢查更新的代碼...這是一種駭人的方法,但它做你想要的工作。 – zwer

0

你可以做這樣的事情:

from threading import Lock 

mutex = Lock() 
def foo(x): 
    my_buf = ['foo'] 
    for i in range(5): 
     ... # do something here 
     my_buf.append('Process {}: in foo {}'.format(x, i)) 

    mutex.acquire() 
    my_print(my_buf) 
    mutex.release() 

def my_print(buffer): 
    for line in buffer: 
     print(line) 

打印你的輸出時,只需用一個鎖。

+0

這與zwer的解決方案類似,但在子流程未完成的情況下,累計輸出將不會打印(據我所知)。我希望看到消息的積累,無論它是否成功或未完成。當然,python會輸出一個堆棧/異常消息,但是在我的實際程序中所提供的消息提供了關於哪裏出錯的更好的信息。 – mepmerp

+0

@mepmerp盡你所能,只要把你的關鍵代碼放在裏面,試着捕捉並適當地處理錯誤,這樣你的子進程就會優雅地關閉。對於你提出的任何情況,這應該是足夠的。但是如果你的實際情況完全不同,你將不得不考慮其他的選擇,比如多線程包中的隊列。 –