0

我正在使用ThreadPoolExecutor,並且需要在任何工作線程失敗的情況下中止整個計算。如何立即重新引發任何工作線程中引發的異常?

例1這將打印成功不管錯誤的,因爲ThreadPoolExecutor不會自動重新拋出異常。

from concurrent.futures import ThreadPoolExecutor 

def task(): 
    raise ValueError 

with ThreadPoolExecutor() as executor: 
    executor.submit(task) 
print('Success') 

例2這正確地崩潰主線程因爲.result()重新引發的異常。但它等待第一個任務完成,所以主線程會延遲處理異常。

import time 
from concurrent.futures import ThreadPoolExecutor 

def task(should_raise): 
    time.sleep(1) 
    if should_raise: 
     raise ValueError 

with ThreadPoolExecutor() as executor: 
    executor.submit(task, False).result() 
    executor.submit(task, True).result() 
print('Success') 

如何,我注意到在主線程(或多或少)後立即發生,處理故障並中止其餘工人工人例外?

回答

1

首先,我們要求他們的結果之前提交的任務。否則,線程甚至沒有並行運行:

futures = [] 
with ThreadPoolExecutor() as executor: 
    futures.append(executor.submit(good_task)) 
    futures.append(executor.submit(bad_task)) 
for future in futures: 
    future.result() 

現在我們可以將異常信息存儲在一個變量,可用於兩個主線程和工作線程:

exc_info = None 

主要線程不能真正殺死其子進程,所以我們有工作人員檢查的異常信息進行設置和停止:

def good_task(): 
    global exc_info 
    while not exc_info: 
     time.sleep(0.1) 

def bad_task(): 
    global exc_info 
    time.sleep(0.2) 
    try: 
     raise ValueError() 
    except Exception: 
     exc_info = sys.exc_info() 

所有線程結束後,麥n線程可以檢查持有異常信息的變量。如果它被填充,我們重新提出異常:

if exc_info: 
    raise exc_info[0].with_traceback(exc_info[1], exc_info[2]) 
print('Success') 
0

我想,我會實現它這樣的:

我主要的過程中,我創建2個隊列:

  1. 人舉報例外,
  2. 一種方式通知取消。

::

import multiprocessing as mp 

error_queue = mp.Queue() 
cancel_queue = mp.Queue() 

我創建的每個ThreadPoolExecutor,並通過這些隊列作爲參數。

class MyExecutor(concurrent.futures.ThreadPoolExecutor): 
    def __init__(self, error_queue, cancel_queue): 
     self.error_queue : error_queue 
     self.cancel_queue = cancel_queue 

每個ThreadPoolExecutor都有一個主循環。在此循環中,我首先掃描cancel_queue以查看是否有「取消」消息。

在主循環中,我還實現了一個異常管理器。如果發生ERREUR,我提出一個例外:

self.status = "running" 
with True: # <- or something else 
    if not self.cancel_queue.empty(): 
     self.status = "cancelled" 
     break 
    try: 
     # normal processing 
     ... 
    except Exception as exc: 
     # you can log the exception here for debug 
     self.error_queue.put(exc) 
     self.status = "error" 
     break 
    time.sleep(.1) 

的主要工序:

運行所有MyExecutor實例。

掃描error_queue:

所有的
while True: 
    if not error_queue.empty(): 
     cancel_queue.put("cancel") 
    time.sleep(.1)