我一直在尋找一個簡單的python線程池模式的很好的實現,實際上找不到任何適合我需要的東西。我正在使用python 2.7,並且我發現的所有模塊都不起作用,或者沒有正確處理工作中的異常。我想知道是否有人知道可以提供我正在尋找的功能類型的庫。非常感謝。處理異常的Python線程池
多重
我第一次嘗試用內置multiprocessing
模塊,但是這並沒有使用線程,但子進程,而不是我們碰上的對象不能醃製的問題。不要去這裏。
from multiprocessing import Pool
class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5)/2
self.fib = int(round((phi**n - (1-phi)**n)/5**0.5))
samples = [Sample() for i in range(8)]
pool = Pool(processes=8)
for s in samples: pool.apply_async(s.compute_fib, [20])
pool.join()
for s in samples: print s.fib
# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
期貨
所以我看到有一些的蟒蛇3.2 here涼爽的併發功能回到港口。這看起來很完美和簡單易用。問題是,當你在其中一個worker中發生異常時,你只會得到類似「ZeroDivisionError」的異常,但沒有回溯,因此沒有指出哪一行導致異常。代碼變得無法調試。不行。
from concurrent import futures
class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5)/2
1/0
self.fib = int(round((phi**n - (1-phi)**n)/5**0.5))
samples = [Sample() for i in range(8)]
pool = futures.ThreadPoolExecutor(max_workers=8)
threads = [pool.submit(s.compute_fib, 20) for s in samples]
futures.wait(threads, return_when=futures.FIRST_EXCEPTION)
for t in threads: t.result()
for s in samples: print s.fib
# futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self)
# 354 def __get_result(self):
# 355 if self._exception:
#--> 356 raise self._exception
# 357 else:
# 358 return self._result
#
# ZeroDivisionError: integer division or modulo by zero
工作者池
我發現一個其他實施這種模式here的。這一次發生異常時會打印它,但隨後我的ipython交互式解釋器處於掛起狀態,需要從其他shell中終止。不行。
import workerpool
class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5)/2
1/0
self.fib = int(round((phi**n - (1-phi)**n)/5**0.5))
samples = [Sample() for i in range(8)]
pool = workerpool.WorkerPool(size=8)
for s in samples: pool.map(s.compute_fib, [20])
pool.wait()
for s in samples: print s.fib
# ZeroDivisionError: integer division or modulo by zero
# ^C^C^C^C^C^C^C^C^D^D
# $ kill 1783
線程池
還有一個其他實施here。這一次發生異常時,它會打印到stderr
,但腳本不會中斷,而是繼續執行,這違反了異常的目的,並且可能使事情變得不安全。仍然不可用。
import threadpool
class Sample(object):
def compute_fib(self, n):
phi = (1 + 5**0.5)/2
1/0
self.fib = int(round((phi**n - (1-phi)**n)/5**0.5))
samples = [Sample() for i in range(8)]
pool = threadpool.ThreadPool(8)
requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples]
requests = [y for x in requests for y in x]
for r in requests: pool.putRequest(r)
pool.wait()
for s in samples: print s.fib
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
# ZeroDivisionError: integer division or modulo by zero
#---> 17 for s in samples: print s.fib
#
#AttributeError: 'Sample' object has no attribute 'fib'
- 更新 -
看來,關於futures
庫,Python 3中的行爲是不一樣的蟒蛇2
futures_exceptions.py
:
from concurrent.futures import ThreadPoolExecutor, as_completed
def div_zero(x):
return x/0
with ThreadPoolExecutor(max_workers=4) as executor:
futures = executor.map(div_zero, range(4))
for future in as_completed(futures): print(future)
的Python 2.7.6輸出:
Traceback (most recent call last):
File "...futures_exceptions.py", line 12, in <module>
for future in as_completed(futures):
File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed
with _AcquireFutures(fs):
File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__
self.futures = sorted(futures, key=id)
File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map
yield future.result()
File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result
return self.__get_result()
File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result
raise self._exception
ZeroDivisionError: integer division or modulo by zero
的Python 3.3.2輸出:
Traceback (most recent call last):
File "...futures_exceptions.py", line 11, in <module>
for future in as_completed(futures):
File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed
with _AcquireFutures(fs):
File "...python3.3/concurrent/futures/_base.py", line 142, in __init__
self.futures = sorted(futures, key=id)
File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator
yield future.result()
File "...python3.3/concurrent/futures/_base.py", line 392, in result
return self.__get_result()
File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result
raise self._exception
File "...python3.3/concurrent/futures/thread.py", line 54, in run
result = self.fn(*self.args, **self.kwargs)
File "...futures_exceptions.py", line 7, in div_zero
return x/0
ZeroDivisionError: division by zero
它並不能完全解決我經常在調試這些問題所使用的問題,而是一個訣竅是用臨時調用替換調用'pool.map'內建的'map'。 – 2014-01-03 07:03:11