正如你已經注意到的,在這種情況下使用鎖會殺死多處理,因爲你基本上已經擁有所有進程等待當前持有STDOUT'權限'的進程的互斥量釋放。但是,並行運行並與您的函數/子進程同步打印在邏輯上是獨佔的。
你可以做的是讓你的主進程作爲你的子進程的'打印機',這樣一旦你的子進程完成/錯誤,然後只有它發回你的主進程打印什麼。你似乎完全滿意印刷不是'實時'(無論如何,正如前面提到的那樣),所以這種方法應該爲你提供恰到好處的服務。所以:
import multiprocessing as mp
import random # just to add some randomness
from time import sleep
def foo(x):
output = ["[Process {}]: foo:".format(x)]
for i in range(5):
output.append('[Process {}] in foo {}'.format(x, i))
sleep(0.2 + 1 * random.random())
return "\n".join(output)
if __name__ == '__main__':
pool = mp.Pool(4)
for res in pool.imap_unordered(foo, range(4)):
print("[MAIN]: Process finished, response:")
print(res) # this will print as soon as one of the processes finishes/errors
pool.close()
,這將給你(因人而異,當然):
[MAIN]: Process finished, response:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] in foo 2
[Process 2] in foo 3
[Process 2] in foo 4
[MAIN]: Process finished, response:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[MAIN]: Process finished, response:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
可以觀察到其他任何東西,包括錯誤的方法相同。
UPDATE - 如果你絕對必須使用其輸出你無法控制的功能,你可以用你的子流程和捕捉他們的標準輸出/ STDERR代替,然後一旦他們這樣做(或拋出異常),你可以回到一切回到過程'經理'打印到實際的標準輸出。有了這樣的設置,我們可以有foo()
喜歡:
def foo(x):
print("[Process {}]: foo:".format(x))
for i in range(5):
print('[Process {}] in foo {}'.format(x, i))
sleep(0.2 + 1 * random.random())
if random.random() < 0.0625: # let's add a 1/4 chance to err:
raise Exception("[Process {}] A random exception is random!".format(x))
return random.random() * 100 # just a random response, you can omit it
請注意,這是意識不到的東西想惹它的操作方式。然後,我們將創建一個外部通用包裝器(因此您不必根據函數對其進行更改)實際上是混亂及其默認行爲(而不僅僅是這個函數,而是與運行時可能調用的所有內容):
def std_wrapper(args):
try:
from StringIO import StringIO # ... for Python 2.x compatibility
except ImportError:
from io import StringIO
import sys
sys.stdout, sys.stderr = StringIO(), StringIO() # replace stdout/err with our buffers
# args is a list packed as: [0] process function name; [1] args; [2] kwargs; lets unpack:
process_name = args[0]
process_args = args[1] if len(args) > 1 else []
process_kwargs = args[2] if len(args) > 2 else {}
# get our method from its name, assuming global namespace of the current module/script
process = globals()[process_name]
response = None # in case a call fails
try:
response = process(*process_args, **process_kwargs) # call our process function
except Exception as e: # too broad but good enough as an example
print(e)
# rewind our buffers:
sys.stdout.seek(0)
sys.stderr.seek(0)
# return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
return sys.stdout.read(), sys.stderr.read(), response
現在我們要做的就是把這種包裝,而不是期望foo()
,併爲其提供信息,什麼代表我們打電話:如果你運行它,所以現在
if __name__ == '__main__':
pool = mp.Pool(4)
# since we're wrapping the process we're calling, we need to send to the wrapper packed
# data with instructions on what to call on our behalf.
# info on args packing available in the std_wrapper function above.
for out, err, res in pool.imap_unordered(std_wrapper, [("foo", [i]) for i in range(4)]):
print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
print(out.rstrip()) # remove the trailing space for niceness, print err if you want
pool.close()
,你會得到這樣的東西:
[MAIN]: Process finished, response: None, STDOUT:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] A random exception is random!
[MAIN]: Process finished, response: 87.9658471743586, STDOUT:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response: 38.929554421661194, STDOUT:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
[MAIN]: Process finished, response: None, STDOUT:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[Process 0] A random exception is random!
儘管foo()
只是打印或錯誤。當然,你可以使用這樣的包裝來調用任何函數,並將任意數量的args/kwargs傳遞給它。
UPDATE#2 - 但是等等!如果我們可以像這樣包裝我們的函數進程,並且捕獲它們的STDOUT/STDERR,我們當然可以把它變成一個裝飾器,並在我們的代碼中使用它來進行簡單的裝飾。所以,我最後提議:
import functools
import multiprocessing
import random # just to add some randomness
import time
def std_wrapper(func):
@functools.wraps(func) # we need this to unravel the target function name
def caller(*args, **kwargs): # and now for the wrapper, nothing new here
try:
from StringIO import StringIO # ... for Python 2.x compatibility
except ImportError:
from io import StringIO
import sys
sys.stdout, sys.stderr = StringIO(), StringIO() # use our buffers instead
response = None # in case a call fails
try:
response = func(*args, **kwargs) # call our wrapped process function
except Exception as e: # too broad but good enough as an example
print(e) # NOTE: the exception is also printed to the captured STDOUT
# rewind our buffers:
sys.stdout.seek(0)
sys.stderr.seek(0)
# return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
return sys.stdout.read(), sys.stderr.read(), response
return caller
@std_wrapper # decorate any function, it won't know you're siphoning its STDOUT/STDERR
def foo(x):
print("[Process {}]: foo:".format(x))
for i in range(5):
print('[Process {}] in foo {}'.format(x, i))
time.sleep(0.2 + 1 * random.random())
if random.random() < 0.0625: # let's add a 1/4 chance to err:
raise Exception("[Process {}] A random exception is random!".format(x))
return random.random() * 100 # just a random response, you can omit it
現在我們可以像以前一樣撥打我們的包裹功能,無需處理參數包裝或諸如此類的事,所以我們回來:
if __name__ == '__main__':
pool = multiprocessing.Pool(4)
for out, err, res in pool.imap_unordered(foo, range(4)):
print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
print(out.rstrip()) # remove the trailing space for niceness, print err if you want
pool.close()
的輸出與前面的例子相同,但是具有更好的可管理性。
這絕對有效,但可以避免更改foo嗎?特別是,我想繼續使用打印。這可能是不可能的,所以我可能不得不吮吸它,只是改變我的實現。另外,如果子進程在運行該函數的過程中失敗,是不是沒有輸出?據我所知,它只會在日誌達到結尾時才輸出。我希望能夠打印出消息的累積情況,而不管它是否已成功運行該功能或未能完成一半。 – mepmerp
@mepmerp - 檢查更新的代碼...這是一種駭人的方法,但它做你想要的工作。 – zwer