2017-10-05 193 views
1

我正在使用threading.Thread和t.start()與可調參數列表來執行長時間運行的多線程處理。我的主線程被阻塞,直到所有線程完成。但是,如果其中一個可調參數拋出異常並終止其他線程,我希望t.start()立即返回。中斷主調用線程如果子線程拋出異常

使用t.join()檢查執行的線程是否提供了有關因異常導致的失敗的信息。

這裏是代碼:

import json 
import requests 


class ThreadServices: 
    def __init__(self): 
     self.obj = "" 

    def execute_services(self, arg1, arg2): 
     try: 
      result = call_some_process(arg1, arg2) #some method 
      #save results somewhere 
     except Exception, e: 
      # raise exception 
      print e 

    def invoke_services(self, stubs): 
     """ 
     Thread Spanning Function   
     """ 
     try: 
      p1 = "" #some value 
      p2 = "" #some value 
      # Call service 1 
      t1 = threading.Thread(target=self.execute_services, args=(a, b,) 

      # Start thread 
      t1.start() 
      # Block till thread completes execution 
      t1.join() 

      thread_pool = list() 
      for stub in stubs: 
       # Start parallel execution of threads 
       t = threading.Thread(target=self.execute_services, 
              args=(p1, p2)) 
       t.start() 
       thread_pool.append(t) 
      for thread in thread_pool: 
       # Block till all the threads complete execution: Wait for all 
       the parallel tasks to complete 
       thread.join() 

      # Start another process thread 
      t2 = threading.Thread(target=self.execute_services, 
              args=(p1, p2) 
      t2.start() 
      # Block till this thread completes execution 
      t2.join() 

      requests.post(url, data= json.dumps({status_code=200})) 
     except Exception, e: 
      print e 
      requests.post(url, data= json.dumps({status_code=500})) 
     # Don't return anything as this function is invoked as a thread from 
     # main calling function 


class Service(ThreadServices): 
    """ 
    Service Class 
    """ 

    def main_thread(self, request, context): 
     """ 
     Main Thread:Invokes Task Execution Sequence in ThreadedService   
     :param request: 
     :param context: 
     :return: 
     """ 
     try: 
      main_thread = threading.Thread(target=self.invoke_services, 
             args=(request,)) 
      main_thread.start() 
      return True 
     except Exception, e: 
      return False 

當我打電話服務()main_thread(請求,上下文)並有一些例外執行T1,我需要得到它在main_thread提出並返回false。我怎樣才能實現這個結構。謝謝!!

回答

0

首先,你太複雜了。我會這樣做:

from thread import start_new_thread as thread 
from time import sleep 

class Task: 
    """One thread per task. 
    This you should do with subclassing threading.Thread(). 
    This is just conceptual example. 
    """ 
    def __init__ (self, func, args=(), kwargs={}): 
     self.func = func 
     self.args = args 
     self.kwargs = kwargs 
     self.error = None 
     self.done = 0 
     self.result = None 

    def _run (self): 
     self.done = 0 
     self.error = None 
     self.result = None 
     # So this is what you should do in subclassed Thread(): 
     try: self.result = self.func(*self.args, **self.kwargs) 
     except Exception, e: 
      self.error = e 
     self.done = 1 

    def start (self): 
     thread(self._run,()) 

    def wait (self, retrexc=1): 
     """Used in place of threading.Thread.join(), but it returns the result of the function self.func() and manages errors..""" 
     while not self.done: sleep(0.001) 
     if self.error: 
      if retrexc: return self.error 
      raise self.error 
     return self.result 

# And this is how you should use your pool: 
def do_something (tasknr): 
    print tasknr-20 
    if tasknr%7==0: raise Exception, "Dummy exception!" 
    return tasknr**120/82.0 

pool = [] 
for task in xrange(20, 50): 
    t = Task(do_something, (task,)) 
    pool.append(t) 
# And only then wait for each one: 
results = [] 
for task in pool: 
    results.append(task.wait()) 
print results 

這樣你可以使task.wait()引發錯誤。該線程已經停止。所以你所要做的就是在完成後從池中或整個池中刪除它們的引用。你甚至可以:

results = [] 
for task in pool: 
    try: results.append(task.wait(0)) 
    except Exception, e: 
     print task.args, "Error:", str(e) 
print results 

現在,沒有嚴格的(我的意思是任務()類)的使用,因爲它需要很多的東西加入到用於實際。

只需子類threading.Thread()並通過重寫run()和join()或添加新的函數(如wait())來實現類似的概念。

+0

謝謝@Dalen。這種方法至少對我有用。一些解決方法,它啓動。 – Zeus