2011-07-18 24 views
52

似乎是從multiprocessing.Pool進程引發異常時,沒有堆棧跟蹤或任何其他指示失敗。例如:多處理引發的異常未檢測到池

from multiprocessing import Pool 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 

p = Pool() 
p.apply_async(go) 
p.close() 
p.join() 

打印1並停止靜音。有趣的是,引發一個BaseException反而起作用。有什麼辦法讓所有異常的行爲與BaseException相同嗎?

+1

我有同樣的問題。原因如下:工作進程捕獲異常並在結果隊列中放置失敗代碼和異常。回到主進程中,Pool的結果處理線程獲取失敗代碼並忽略它。某種猴子補丁調試模式可能是可能的。另一種方法是確保你的工作者函數捕獲任何異常,返回它併爲你的處理器打印一個錯誤代碼。 –

+0

這已經在這裏回答:http://stackoverflow.com/a/26096355/512111 – j08lue

回答

23

我對這個問題的合理解決,至少在調試的目的。目前我還沒有一個解決方案可以在主流程中引發異常。我的第一個想法是使用裝飾,但你只能醃functions defined at the top level of a module,所以這是正確的。

取而代之的是一個簡單的包裝類和一個Pool子類,它使用apply_async(因此爲apply)。我將離開map_async作爲讀者的練習。

import traceback 
from multiprocessing.pool import Pool 
import multiprocessing 

# Shortcut to multiprocessing's logger 
def error(msg, *args): 
    return multiprocessing.get_logger().error(msg, *args) 

class LogExceptions(object): 
    def __init__(self, callable): 
     self.__callable = callable 

    def __call__(self, *args, **kwargs): 
     try: 
      result = self.__callable(*args, **kwargs) 

     except Exception as e: 
      # Here we add some debugging help. If multiprocessing's 
      # debugging is on, it will arrange to log the traceback 
      error(traceback.format_exc()) 
      # Re-raise the original exception so the Pool worker can 
      # clean up 
      raise 

     # It was fine, give a normal answer 
     return result 

class LoggingPool(Pool): 
    def apply_async(self, func, args=(), kwds={}, callback=None): 
     return Pool.apply_async(self, LogExceptions(func), args, kwds, callback) 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 

multiprocessing.log_to_stderr() 
p = LoggingPool(processes=1) 

p.apply_async(go) 
p.close() 
p.join() 

這給了我:

1 
[ERROR/PoolWorker-1] Traceback (most recent call last): 
    File "mpdebug.py", line 24, in __call__ 
    result = self.__callable(*args, **kwargs) 
    File "mpdebug.py", line 44, in go 
    raise Exception() 
Exception 
+0

這太糟糕了沒有一個簡單的解決方案(或我的一個錯誤),但這將完成工作 - 謝謝! –

+4

我已經意識到可以使用裝飾器,如果你使用@ functools.wrap(func)來裝飾你的包裝。這使您的裝飾功能看起來像在模塊頂層定義的功能。 –

+1

[本答案](http://stackoverflow.com/a/26096355/512111)中的解決方案更簡單**和**支持重新提升主進程中的錯誤! – j08lue

0

我會嘗試使用PDB:

import pdb 
import sys 
def handler(type, value, tb): 
    pdb.pm() 
sys.excepthook = handler 
+0

它從來沒有達到處理程序在這種情況下,奇怪 –

42

也許我失去了一些東西,但是,這不是什麼結果對象返回的get方法?見Process Pools

類multiprocessing.pool.AsyncResult

類由Pool.apply_async()和Pool.map_async()返回的結果。獲得([超時])
返回結果當它到達。如果超時時間不爲「無」,並且結果未超過 超時秒數,則會引發multiprocessing.TimeoutError。如果遠程調用異常,則該異常將由get()重新調整。

所以,稍微修改你的榜樣,一個可以做

from multiprocessing import Pool 

def go(): 
    print(1) 
    raise Exception("foobar") 
    print(2) 

p = Pool() 
x = p.apply_async(go) 
x.get() 
p.close() 
p.join() 

其中給出的結果

1 
Traceback (most recent call last): 
    File "rob.py", line 10, in <module> 
    x.get() 
    File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get 
    raise self._value 
Exception: foobar 

這並不完全令人滿意,因爲它不打印回溯,但有總比沒有好。

UPDATE:此bug已被固定在Python 3.4,理查德奧德柯克的禮貌。請參閱問題get method of multiprocessing.pool.Async should return full traceback

+0

這很有用,謝謝。我沒有看到。 –

+0

讓我知道如果你找出它爲什麼不返回回溯。由於它能夠返回錯誤值,因此它也應該能夠返回回溯。我可能會問一些合適的論壇 - 也許是一些Python開發列表。順便說一句,正如你可能已經猜到的那樣,我在試圖找出同樣的事情時遇到了你的問題。 :-) –

+4

注意:爲了執行一系列同時運行的任務,您應該將所有結果保存在列表中,然後使用get()遍歷每個結果,如果您不想要第一個錯誤。 – dfrankow

1

我創建了一個模塊RemoteException.py,顯示在一個過程中異常的全面回溯。 Python2。Download it並添加到您的代碼:

import RemoteException 

@RemoteException.showError 
def go(): 
    raise Exception('Error!') 

if __name__ == '__main__': 
    import multiprocessing 
    p = multiprocessing.Pool(processes = 1) 
    r = p.apply(go) # full traceback is shown here 
7

我已經受夠了這個裝飾的成功記錄例外:

import traceback, functools, multiprocessing 

def trace_unhandled_exceptions(func): 
    @functools.wraps(func) 
    def wrapped_func(*args, **kwargs): 
     try: 
      func(*args, **kwargs) 
     except: 
      print 'Exception in '+func.__name__ 
      traceback.print_exc() 
    return wrapped_func 

與問題的代碼,這是

@trace_unhandled_exceptions 
def go(): 
    print(1) 
    raise Exception() 
    print(2) 

p = multiprocessing.Pool(1) 

p.apply_async(go) 
p.close() 
p.join() 

簡單裝飾你傳遞給你的進程池的函數。這項工作的關鍵是@functools.wraps(func),否則多處理將拋出PicklingError。上述

代碼給出

1 
Exception in go 
Traceback (most recent call last): 
    File "<stdin>", line 5, in wrapped_func 
    File "<stdin>", line 4, in go 
Exception 
+0

如果函數並行運行 - 在這種情況下爲go() - 返回一個值,則這不起作用。裝飾器不通過返回值。除此之外,我喜歡這個解決方案。 – MD004

+0

對於傳遞返回值只是修改wrapper_func這樣的: '高清wrapped_func(* ARGS,** kwargs): 結果=無 嘗試: 結果= FUNC(* ARGS,** kwargs)除了 : 打印( '例外'+ func .__ name__) traceback.print_exc() 返回結果' 像魅力一樣工作;) – MoTSCHIGGE

16

與寫作的時候票數最多的解決方案的一個問題:

from multiprocessing import Pool 

def go(): 
    print(1) 
    raise Exception("foobar") 
    print(2) 

p = Pool() 
x = p.apply_async(go) 
x.get() ## waiting here for go() to complete... 
p.close() 
p.join() 

由於@dfrankow指出,它會等待x.get(),這廢墟異步運行任務的點。因此,爲了提高效率(特別是如果你的工人功能go需要很長的時間)我將其更改爲:

from multiprocessing import Pool 

def go(x): 
    print(1) 
    # task_that_takes_a_long_time() 
    raise Exception("Can't go anywhere.") 
    print(2) 
    return x**2 

p = Pool() 
results = [] 
for x in range(1000): 
    results.append(p.apply_async(go, [x])) 

p.close() 

for r in results: 
    r.get() 

優勢:工人功能異步運行,所以例如,如果你正在運行多在幾個核心上的任務,它將比原來的解決方案高效得多。

缺點:如果在工人功能的異常,它只會提高池後已經完成了所有的任務。這可能是也可能不是理想的行爲。編輯根據@ colinfang的評論,這解決了這個問題。

+0

好的努力。但是,由於您的示例是基於假設存在多個結果而預測的,因此可能會擴展一點,以致實際上有多個結果?此外,你寫道:「特別是如果你工作的功能」。那應該是「你的」。 –

+0

你是對的,謝謝。我已經擴展了這個例子。 – gozzilli

+1

很酷。此外,您可能想嘗試/取決於您希望如何容忍提取中的錯誤。 – dfrankow

0

既然你已經使用apply_sync,我猜這個用例是想做一些同步任務。使用回調進行處理是另一種選擇。請注意,此選項僅適用於python3.2及更高版本,python2.7不適用。

from multiprocessing import Pool 

def callback(result): 
    print('success', result) 

def callback_error(result): 
    print('error', result) 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 

p = Pool() 
p.apply_async(go, callback=callback, error_callback=callback_error) 

# You can do another things 

p.close() 
p.join() 
+0

'apply_async'方法沒有'error_callbak',請參考https://docs.python.org/3.1/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool .apply_async – Sanju

+1

適用於更高版本:https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async – Asoul

1
import logging 
from multiprocessing import Pool 

def proc_wrapper(func, *args, **kwargs): 
    """Print exception because multiprocessing lib doesn't return them right.""" 
    try: 
     return func(*args, **kwargs) 
    except Exception as e: 
     logging.exception(e) 
     raise 

def go(x): 
    print x 
    raise Exception("foobar") 

p = Pool() 
p.apply_async(proc_wrapper, (go, 5)) 
p.join() 
p.close() 
0

由於已經爲multiprocessing.Pool提供像樣的答案,我將提供使用的完整性不同的方法解決。

對於python >= 3.2以下解決方案似乎是最簡單的:

from concurrent.futures import ProcessPoolExecutor, wait 

def go(): 
    print(1) 
    raise Exception() 
    print(2) 


futures = [] 
with ProcessPoolExecutor() as p: 
    for i in range(10): 
     futures.append(p.submit(go)) 

results = [f.result() for f in futures] 

優點:

  • 很少的代碼
  • 提出的主要工序異常
  • 提供一個堆棧跟蹤
  • 無外部依存關係

有關API的更多信息請查看:https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

此外,如果您要提交大量的任務,你想你的主要過程,儘快失敗,因爲你的任務之一失敗了,你可以請使用以下代碼片段:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed 
import time 


def go(): 
    print(1) 
    time.sleep(0.3) 
    raise Exception() 
    print(2) 


futures = [] 
with ProcessPoolExecutor(1) as p: 
    for i in range(10): 
     futures.append(p.submit(go)) 

    for f in as_completed(futures): 
     if f.exception() is not None: 
      for f in futures: 
       f.cancel() 
      break 

[f.result() for f in futures] 

所有其他答案只在所有任務執行完成後纔會失敗。