2010-05-13 131 views
129

我對Python和多線程編程一般都很陌生。基本上,我有一個將文件複製到另一個位置的腳本。我希望將它放置在另一個線程中,以便我可以輸出....來指示腳本仍在運行。在Python中的調用者線程中捕獲一個線程的異常

我遇到的問題是,如果文件不能被複制,它會引發異常。如果在主線程中運行,這是可以的;然而,具有下面的代碼不起作用:

try: 
    threadClass = TheThread(param1, param2, etc.) 
    threadClass.start() ##### **Exception takes place here** 
except: 
    print "Caught an exception" 

在線程類本身,我試圖重新拋出異常,但它不工作。我看到這裏的人問過類似的問題,但他們似乎都在做一些比我想要做的更具體的事情(並且我不太瞭解提供的解決方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪裏或如何使用它。

所有幫助非常感謝!

編輯:爲線程類的代碼如下:

class TheThread(threading.Thread): 
    def __init__(self, sourceFolder, destFolder): 
     threading.Thread.__init__(self) 
     self.sourceFolder = sourceFolder 
     self.destFolder = destFolder 

    def run(self): 
     try: 
      shul.copytree(self.sourceFolder, self.destFolder) 
     except: 
      raise 
+0

你能更深入地瞭解什麼是'TheThread'內發生了什麼?代碼示例或許? – jathanism 2010-05-13 18:43:01

+0

當然。我會編輯上面的回覆以包含一些細節。 – Phanto 2010-05-13 18:44:45

+1

你有沒有考慮切換它,所以主線程是一些東西,進度指示器在產生的線程中? – 2010-05-13 19:00:18

回答

80

問題是thread_obj.start()立即返回。您生成的子線程在其自己的上下文中執行,並擁有自己的堆棧。發生的任何異常都在子線程的上下文中,並且在它自己的堆棧中。我現在可以想到的一種方式是將此信息傳遞給父線程,方法是使用某種消息傳遞方式,以便您可以查看該消息。

試試這個關於大小:

import sys 
import threading 
import Queue 


class ExcThread(threading.Thread): 

    def __init__(self, bucket): 
     threading.Thread.__init__(self) 
     self.bucket = bucket 

    def run(self): 
     try: 
      raise Exception('An error occured here.') 
     except Exception: 
      self.bucket.put(sys.exc_info()) 


def main(): 
    bucket = Queue.Queue() 
    thread_obj = ExcThread(bucket) 
    thread_obj.start() 

    while True: 
     try: 
      exc = bucket.get(block=False) 
     except Queue.Empty: 
      pass 
     else: 
      exc_type, exc_obj, exc_trace = exc 
      # deal with the exception 
      print exc_type, exc_obj 
      print exc_trace 

     thread_obj.join(0.1) 
     if thread_obj.isAlive(): 
      continue 
     else: 
      break 


if __name__ == '__main__': 
    main() 
+2

我剛試過這個,它確實有效。謝謝! :) – Phanto 2010-05-13 23:40:02

+3

爲什麼不加入線程而不是這個醜陋的while循環?請參閱'multiprocessing'等價:https://gist.github.com/2311116 – schlamar 2012-12-11 14:03:39

+1

爲什麼不使用EventHook模式http://stackoverflow.com/questions/1092531/event-system-in-python/1094423#1094423基於@Lasse的答案?而不是循環的東西? – 2014-01-22 23:01:19

0

使用裸節選是不是一個好的做法,因爲你通常捕獲到比你討價還價更多。

我建議修改except只捕捉你想處理的異常。我不認爲提高效果會產生預期的效果,因爲當您在外部try中實例化TheThread時,如果它引發異常,則分配永遠不會發生。

相反,你可能想只是它警覺,繼續前進,如:

def run(self): 
    try: 
     shul.copytree(self.sourceFolder, self.destFolder) 
    except OSError, err: 
     print err 

然後當異常被捕獲,你可以處理它。然後,當外部tryTheThread中發現異常時,您知道它不會是您已經處理的異常,並且將幫助您隔離流程流程。

+1

那麼,如果該線程中有錯誤,我希望完整的程序通知用戶存在問題並優雅地結束。出於這個原因,我想讓主線程捕捉並處理所有異常。但是,如果TheThread拋出一個異常,主線程的try /仍然不能捕獲它,問題仍然存在。我可以讓線程檢測到異常並返回一個錯誤,指示操作失敗。這將達到相同的期望結果,但我仍然想知道如何正確捕獲子線程異常。 – Phanto 2010-05-13 19:35:36

214

你有電話的角度考慮線程。

請考慮這一點。

您致電當地市議會並提出問題。當他們爲你找到答案時,你堅持。當他們有答案時,他們會告訴你,然後你掛斷電話。如果由於某種原因他們找不到答案(例外),他們會告訴你。

這是一個同步,正常的方法調用的工作方式。你調用一個方法,當它返回時,你有答案(好或壞)。)

然而,一個線程進入更多這樣的:

你打電話了當地市議會,並提出一個問題,並要求他們給你打電話回來時,他們找到了答案。你然後掛斷電話。

在這一點上,你不知道他們是否會找到答案,所以現在嘗試處理查詢結果的任何嘗試都將失敗,因爲你根本沒有結果呢。

相反,您必須對來電作出反應,並採取消息,無論好壞,然後處理它。

就您的代碼而言,您需要具有對線程失敗作出反應的代碼,並記錄或處理異常。你說的問題中的代碼,你說的不起作用,就像試圖在掛斷電話後立即處理電話結果,但仍然沒有答案。

22

雖然不可能直接捕獲在其他線程中拋出的異常,但下面的代碼非常透明地獲得了非常接近此功能的內容。您的子線程必須繼承ExThread類而不是threading.Thread,並且父線程必須在等待線程完成其作業時調用child_thread.join_with_exception()方法而不是child_thread.join()

該實現的技術細節:當子線程拋出異常時,它通過Queue傳遞給父項,並在父線程中再次拋出。請注意,這種方法沒有忙於等待。

#!/usr/bin/env python 

import sys 
import threading 
import Queue 

class ExThread(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 
     self.__status_queue = Queue.Queue() 

    def run_with_exception(self): 
     """This method should be overriden.""" 
     raise NotImplementedError 

    def run(self): 
     """This method should NOT be overriden.""" 
     try: 
      self.run_with_exception() 
     except BaseException: 
      self.__status_queue.put(sys.exc_info()) 
     self.__status_queue.put(None) 

    def wait_for_exc_info(self): 
     return self.__status_queue.get() 

    def join_with_exception(self): 
     ex_info = self.wait_for_exc_info() 
     if ex_info is None: 
      return 
     else: 
      raise ex_info[1] 

class MyException(Exception): 
    pass 

class MyThread(ExThread): 
    def __init__(self): 
     ExThread.__init__(self) 

    def run_with_exception(self): 
     thread_name = threading.current_thread().name 
     raise MyException("An error in thread '{}'.".format(thread_name)) 

def main(): 
    t = MyThread() 
    t.start() 
    try: 
     t.join_with_exception() 
    except MyException as ex: 
     thread_name = threading.current_thread().name 
     print "Caught a MyException in thread '{}': {}".format(thread_name, ex) 

if __name__ == '__main__': 
    main() 
+1

難道你不想捕獲'BaseException',而不是'Exception'?你所做的只是將異常從一個'Thread'傳播到另一個。現在,IE,一個'KeyboardInterrupt',如果它在後臺線程中產生,它將被默默地忽略。 – ArtOfWarfare 2015-07-24 14:00:14

+0

好主意。更改了這部分代碼。 – 2015-07-27 18:11:00

+0

如果在死線程上第二次調用'join_with_exception',將無限期掛起。修復:https://github.com/fraserharris/threading-extensions/blob/master/threading_extensions.py#L28 – 2016-05-11 20:05:49

10

如果在一個線程中發生了異常,最好的辦法是在join在調用線程重新提出。您可以使用sys.exc_info()函數獲取有關當前正在處理的異常的信息。該信息可以簡單地存儲爲線程對象的屬性,直到調用join,此時可以重新提升它。

注意,一個Queue.Queue(在其他的答案建議)沒有必要在線程拋出最多1例外拋出異常之後完成這個簡單的例子。我們通過簡單地等待線程完成來避免競爭條件。

例如,擴展ExcThread(下面),覆蓋excRun(而不是run)。

的Python 2.x的:

import threading 

class ExcThread(threading.Thread): 
    def excRun(self): 
    pass 

    def run(self): 
    self.exc = None 
    try: 
     # Possibly throws an exception 
     self.excRun() 
    except: 
     import sys 
     self.exc = sys.exc_info() 
     # Save details of the exception thrown but don't rethrow, 
     # just complete the function 

    def join(self): 
    threading.Thread.join(self) 
    if self.exc: 
     msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1]) 
     new_exc = Exception(msg) 
     raise new_exc.__class__, new_exc, self.exc[2] 

Python 3中。X:

raise 3參數的形式消失在Python 3,因此改變它的最後一行:

raise new_exc.with_traceback(self.exc[2]) 
21

concurrent.futures模塊可以很方便地做到在單獨的線程(或過程)的工作和處理任何由此產生的異常:

import concurrent.futures 
import shutil 

def copytree_with_dots(src_path, dst_path): 
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: 
     # Execute the copy on a separate thread, 
     # creating a future object to track progress. 
     future = executor.submit(shutil.copytree, src_path, dst_path) 

     while future.running(): 
      # Print pretty dots here. 
      pass 

     # Return the value returned by shutil.copytree(), None. 
     # Raise any exceptions raised during the copy process. 
     return future.result() 

concurrent.futures包括在Python 3.2,並且可作爲the backported futures module早期版本。

+2

雖然這不完全是OP所要求的,但它正是我需要的提示。謝謝。 – 2015-12-04 17:49:27

2

作爲線程的一個不高興,我花了很長時間才明白如何實現Mateusz Kobos的代碼(上圖)。這是一個澄清的版本,以幫助瞭解如何使用它。

#!/usr/bin/env python 

import sys 
import threading 
import Queue 

class ExThread(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 
     self.__status_queue = Queue.Queue() 

    def run_with_exception(self): 
     """This method should be overriden.""" 
     raise NotImplementedError 

    def run(self): 
     """This method should NOT be overriden.""" 
     try: 
      self.run_with_exception() 
     except Exception: 
      self.__status_queue.put(sys.exc_info()) 
     self.__status_queue.put(None) 

    def wait_for_exc_info(self): 
     return self.__status_queue.get() 

    def join_with_exception(self): 
     ex_info = self.wait_for_exc_info() 
     if ex_info is None: 
      return 
     else: 
      raise ex_info[1] 

class MyException(Exception): 
    pass 

class MyThread(ExThread): 
    def __init__(self): 
     ExThread.__init__(self) 

    # This overrides the "run_with_exception" from class "ExThread" 
    # Note, this is where the actual thread to be run lives. The thread 
    # to be run could also call a method or be passed in as an object 
    def run_with_exception(self): 
     # Code will function until the int 
     print "sleeping 5 seconds" 
     import time 
     for i in 1, 2, 3, 4, 5: 
      print i 
      time.sleep(1) 
     # Thread should break here 
     int("str") 
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?   
#   thread_name = threading.current_thread().name 
#   raise MyException("An error in thread '{}'.".format(thread_name)) 

if __name__ == '__main__': 
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet) 
    t = MyThread() 
    # This actually starts the thread 
    t.start() 
    print 
    print ("Notice 't.start()' is considered to have completed, although" 
      " the countdown continues in its new thread. So you code " 
      "can tinue into new processing.") 
    # Now that the thread is running, the join allows for monitoring of it 
    try: 
     t.join_with_exception() 
    # should be able to be replace "Exception" with specific error (untested) 
    except Exception, e: 
     print 
     print "Exceptioon was caught and control passed back to the main thread" 
     print "Do some handling here...or raise a custom exception " 
     thread_name = threading.current_thread().name 
     e = ("Caught a MyException in thread: '" + 
      str(thread_name) + 
      "' [" + str(e) + "]") 
     raise Exception(e) # Or custom class of exception, such as MyException 
0

我喜歡的一種方法是基於observer pattern。我定義了一個線程用來向偵聽器發出異常的信號類。它也可以用來從線程返回值。例如:

import threading 

class Signal: 
    def __init__(self): 
     self._subscribers = list() 

    def emit(self, *args, **kwargs): 
     for func in self._subscribers: 
      func(*args, **kwargs) 

    def connect(self, func): 
     self._subscribers.append(func) 

    def disconnect(self, func): 
     try: 
      self._subscribers.remove(func) 
     except ValueError: 
      raise ValueError('Function {0} not removed from {1}'.format(func, self)) 


class WorkerThread(threading.Thread): 

    def __init__(self, *args, **kwargs): 
     super(WorkerThread, self).__init__(*args, **kwargs) 
     self.Exception = Signal() 
     self.Result = Signal() 

    def run(self): 
     if self._Thread__target is not None: 
      try: 
       self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) 
      except Exception as e: 
       self.Exception.emit(e) 
      else: 
       self.Result.emit(self._return_value) 

if __name__ == '__main__': 
    import time 

    def handle_exception(exc): 
     print exc.message 

    def handle_result(res): 
     print res 

    def a(): 
     time.sleep(1) 
     raise IOError('a failed') 

    def b(): 
     time.sleep(2) 
     return 'b returns' 

    t = WorkerThread(target=a) 
    t2 = WorkerThread(target=b) 
    t.Exception.connect(handle_exception) 
    t2.Result.connect(handle_result) 
    t.start() 
    t2.start() 

    print 'Threads started' 

    t.join() 
    t2.join() 
    print 'Done' 

我沒有足夠的線程經驗來聲稱這是一個完全安全的方法。但它對我有用,我喜歡這種靈活性。

+0

你在join()之後斷開連接嗎? – ealeon 2015-05-11 14:24:31

+0

我不這樣做,但我想這將是一個好主意,所以你沒有提及懸而未決的東西。 – RickardSjogren 2015-05-11 14:43:16

+0

我注意到「handle_exception」仍然是子線程的一部分。需要將它傳遞給線程調用者 – ealeon 2015-05-11 14:44:37

1

類似的方式等RickardSjogren的沒有隊列,SYS等也沒有一些聽衆信號:直接執行,其對應於一個不同的塊中的異常處理程序。

#!/usr/bin/env python3 

import threading 

class ExceptionThread(threading.Thread): 

    def __init__(self, callback=None, *args, **kwargs): 
     """ 
     Redirect exceptions of thread to an exception handler. 

     :param callback: function to handle occured exception 
     :type callback: function(thread, exception) 
     :param args: arguments for threading.Thread() 
     :type args: tuple 
     :param kwargs: keyword arguments for threading.Thread() 
     :type kwargs: dict 
     """ 
     self._callback = callback 
     super().__init__(*args, **kwargs) 

    def run(self): 
     try: 
      if self._target: 
       self._target(*self._args, **self._kwargs) 
     except BaseException as e: 
      if self._callback is None: 
       raise e 
      else: 
       self._callback(self, e) 
     finally: 
      # Avoid a refcycle if the thread is running a function with 
      # an argument that has a member that points to the thread. 
      del self._target, self._args, self._kwargs, self._callback 

只有self._callback和run()中的except塊是普通線程的附加。線程。

4

這個問題有很多非常複雜的答案。我簡單地說明了這一點,因爲這對我來說似乎足夠了。

from threading import Thread 

class PropagatingThread(Thread): 
    def run(self): 
     self.exc = None 
     try: 
      if hasattr(self, '_Thread__target'): 
       # Thread uses name mangling prior to Python 3. 
       self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) 
      else: 
       self.ret = self._target(*self._args, **self._kwargs) 
     except BaseException as e: 
      self.exc = e 

    def join(self): 
     super(PropagatingThread, self).join() 
     if self.exc: 
      raise self.exc 
     return self.ret 

如果您確定你永遠只能在一個或Python的其他版本上運行,你可以下來減少的run()方法只是錯位版本(如果您只需要運行3)之前的Python版本,或者乾淨的版本(如果你只能運行在以3開頭的Python版本上)。

用法示例:

def f(*args, **kwargs) 
    print(args) 
    print(kwargs) 
    raise Exception('I suck') 

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'}) 
t.start() 
t.join() 

,你會看到所提出的其他線程時加入例外。

3

這是一個討厭的小問題,我希望把我的解決方案中的一些其他的解決方案,我發現(async.io例如)看起來前途無量,但也提出了一個黑盒的。隊列/事件循環方法將您與某個實現聯繫起來。 The concurrent futures source code, however, is around only 1000 lines, and easy to comprehend。它使我能夠輕鬆解決我的問題:創建專門的工作線程,而無需太多設置,並能夠在主線程中捕獲異常。

我的解決方案使用併發期貨API和線程API。它允許你創建一個工人,它可以爲你提供線程和未來。這樣,你可以加入線程等待結果:

worker = Worker(test) 
thread = worker.start() 
thread.join() 
print(worker.future.result()) 

...或者你可以讓工人只發送一個回調完成後:

worker = Worker(test) 
thread = worker.start(lambda x: print('callback', x)) 

...或者你可以循環,直到事件結束:

worker = Worker(test) 
thread = worker.start() 

while True: 
    print("waiting") 
    if worker.future.done(): 
     exc = worker.future.exception() 
     print('exception?', exc) 
     result = worker.future.result() 
     print('result', result)   
     break 
    time.sleep(0.25) 

下面的代碼:

from concurrent.futures import Future 
import threading 
import time 

class Worker(object): 
    def __init__(self, fn, args=()): 
     self.future = Future() 
     self._fn = fn 
     self._args = args 

    def start(self, cb=None): 
     self._cb = cb 
     self.future.set_running_or_notify_cancel() 
     thread = threading.Thread(target=self.run, args=()) 
     thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process 
     thread.start() 
     return thread 

    def run(self): 
     try: 
      self.future.set_result(self._fn(*self._args)) 
     except BaseException as e: 
      self.future.set_exception(e) 

     if(self._cb): 
      self._cb(self.future.result()) 

...和測試功能:

def test(*args): 
    print('args are', args) 
    time.sleep(2) 
    raise Exception('foo') 
0

捕獲線程異常並傳回調用方法的一種簡單方法可以是將字典或列表傳遞給worker方法。

例(通過字典來輔助方法):

import threading 

def my_method(throw_me): 
    raise Exception(throw_me) 

def worker(shared_obj, *args, **kwargs): 
    try: 
     shared_obj['target'](*args, **kwargs) 
    except Exception as err: 
     shared_obj['err'] = err 

shared_obj = {'err':'', 'target': my_method} 
throw_me = "Test" 

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={}) 
th.start() 
th.join() 

if shared_obj['err']: 
    print(">>%s" % shared_obj['err']) 
相關問題