2013-03-12 132 views
11

我一直在尋找一個簡單的python線程池模式的很好的實現,實際上找不到任何適合我需要的東西。我正在使用python 2.7,並且我發現的所有模塊都不起作用,或者沒有正確處理工作中的異常。我想知道是否有人知道可以提供我正在尋找的功能類型的庫。非常感謝。處理異常的Python線程池

多重

我第一次嘗試用內置multiprocessing模塊,但是這並沒有使用線程,但子進程,而不是我們碰上的對象不能醃製的問題。不要去這裏。

from multiprocessing import Pool 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = Pool(processes=8) 
for s in samples: pool.apply_async(s.compute_fib, [20]) 
pool.join() 
for s in samples: print s.fib 

# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed 

期貨

所以我看到有一些的蟒蛇3.2 here涼爽的併發功能回到港口。這看起來很完美和簡單易用。問題是,當你在其中一個worker中發生異常時,你只會得到類似「ZeroDivisionError」的異常,但沒有回溯,因此沒有指出哪一行導致異常。代碼變得無法調試。不行。

from concurrent import futures 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     1/0 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = futures.ThreadPoolExecutor(max_workers=8) 
threads = [pool.submit(s.compute_fib, 20) for s in samples] 
futures.wait(threads, return_when=futures.FIRST_EXCEPTION) 
for t in threads: t.result() 
for s in samples: print s.fib 


# futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self) 
# 354  def __get_result(self): 
# 355   if self._exception: 
#--> 356    raise self._exception 
# 357   else: 
# 358    return self._result 
# 
# ZeroDivisionError: integer division or modulo by zero 

工作者池

我發現一個其他實施這種模式here的。這一次發生異常時會打印它,但隨後我的ipython交互式解釋器處於掛起狀態,需要從其他shell中終止。不行。

import workerpool 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     1/0 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = workerpool.WorkerPool(size=8) 
for s in samples: pool.map(s.compute_fib, [20]) 
pool.wait() 
for s in samples: print s.fib 

# ZeroDivisionError: integer division or modulo by zero 
# ^C^C^C^C^C^C^C^C^D^D 
# $ kill 1783 

線程池

還有一個其他實施here。這一次發生異常時,它會打印到stderr,但腳本不會中斷,而是繼續執行,這違反了異常的目的,並且可能使事情變得不安全。仍然不可用。

import threadpool 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     1/0 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = threadpool.ThreadPool(8) 
requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples] 
requests = [y for x in requests for y in x] 
for r in requests: pool.putRequest(r) 
pool.wait() 
for s in samples: print s.fib 

# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
#---> 17 for s in samples: print s.fib 
# 
#AttributeError: 'Sample' object has no attribute 'fib' 

- 更新 -

看來,關於futures庫,Python 3中的行爲是不一樣的蟒蛇2

futures_exceptions.py

from concurrent.futures import ThreadPoolExecutor, as_completed 

def div_zero(x): 
    return x/0 

with ThreadPoolExecutor(max_workers=4) as executor: 
    futures = executor.map(div_zero, range(4)) 
    for future in as_completed(futures): print(future) 

的Python 2.7.6輸出:

Traceback (most recent call last): 
    File "...futures_exceptions.py", line 12, in <module> 
    for future in as_completed(futures): 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed 
    with _AcquireFutures(fs): 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__ 
    self.futures = sorted(futures, key=id) 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map 
    yield future.result() 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result 
    return self.__get_result() 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result 
    raise self._exception 
ZeroDivisionError: integer division or modulo by zero 

的Python 3.3.2輸出:

Traceback (most recent call last): 
    File "...futures_exceptions.py", line 11, in <module> 
    for future in as_completed(futures): 
    File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed 
    with _AcquireFutures(fs): 
    File "...python3.3/concurrent/futures/_base.py", line 142, in __init__ 
    self.futures = sorted(futures, key=id) 
    File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator 
    yield future.result() 
    File "...python3.3/concurrent/futures/_base.py", line 392, in result 
    return self.__get_result() 
    File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result 
    raise self._exception 
    File "...python3.3/concurrent/futures/thread.py", line 54, in run 
    result = self.fn(*self.args, **self.kwargs) 
    File "...futures_exceptions.py", line 7, in div_zero 
    return x/0 
ZeroDivisionError: division by zero 
+0

它並不能完全解決我經常在調試這些問題所使用的問題,而是一個訣竅是用臨時調用替換調用'pool.map'內建的'map'。 – 2014-01-03 07:03:11

回答

0

簡單的解決辦法:使用任何替代方案最適合你和你的員工實現自己的try-except塊。如果必須的話,圍繞根呼叫。

我不會說這些庫「錯誤地」處理異常。他們有一個默認行爲,不管是原始的。如果違約不適合你,你應該自己處理。

+0

添加'try-execpt'塊不能解決任何問題。在'concurrent'的情況下,在捕獲新的異常之後,我仍然無法到達原始回溯。在'workerpool'的情況下,我從來沒有去過除了解釋器崩潰之前的塊。在'threadpool'的情況下,我從來沒有去過except塊,因爲根本沒有例外。 – xApple 2013-03-12 11:06:21

+1

您正在考慮主線程或進程中的'try'塊。我說你在函數工作進程運行周圍使用'try'塊。如果您希望在工作線程/進程中「拋出」異常並將其發送到主腳本,則需要先將其捕獲到它發生的位置。 – slezica 2013-03-12 11:15:28

+0

那麼我不會爲我想運行的每一個函數寫錯誤處理。所以你說的是我應該寫我自己的全局錯誤處理。是的,我可以選擇其中一個庫並開始編輯源代碼以添加功能,但這正是我想避免的:) – xApple 2013-03-12 12:00:48