2014-09-22 68 views
3

我正在使用Python 2.7。Python中ThreadPool中每個線程的超時

我目前使用ThreadPoolExecuter這樣的:

params = [1,2,3,4,5,6,7,8,9,10] 
with concurrent.futures.ThreadPoolExecutor(5) as executor: 
    result = list(executor.map(f, params)) 

問題是f有時運行時間過長。每當我運行f,我想限制它的運行到100秒,然後殺死它。

最終,對於xparam中的每個元素,我想指出是否必須殺死f,如果不是 - 返回值是多少。 即使f超時一個參數,我仍然想運行它與下一個參數。

executer.map方法確實有一個timeout參數,但它爲整個運行設置了一個超時時間,從調用時間到executer.map,而不是分別爲每個線程。

什麼是最簡單的方法來獲得我想要的行爲?

+1

沒有直接的方法來殺死Python中的線程。如果傳遞給map的timeout超時,它不會實際終止執行程序線程,它只會使'future.result(timeout)'調用它在內部引發'TimeoutError'異常。雖然,工作線程將繼續在後臺運行。如果您需要線程實際被終止,您需要讓您的工作人員函數檢查父代在超時過期後可以設置的某種標誌。然而,這可能並不容易實現,這取決於工作人員功能在做什麼。 – dano 2014-09-22 14:35:21

+0

@dano:我明白了。仍然在後臺運行的過程是我可能能夠忍受的事情。但讓我們說線程處理參數[4]卡住了,我仍然可以得到處理params [5]的參數params [9]的結果嗎? – user302099 2014-09-22 15:32:41

+0

@ user302099:如果在'params [4]'之前準備好了,你可以使用'as_completed()'而不是'map()'來得到'params [5]'結果。如果你使用線程,那麼函數應該配合(尊重退出條件)。如果你不能依賴這個函數來行爲,那麼就使用進程。 – jfs 2014-09-22 15:52:41

回答

3

這個答案是關於python的多處理庫,它通常比線程庫更可取,除非你的函數正在等待網絡調用。請注意,多處理和線程庫具有相同的接口。

鑑於您每個進程運行潛在的100秒,相比之下,創建每個進程的開銷相當小。您可能必須制定自己的流程才能獲得必要的控制權。

一種選擇是包裝在另一個函數f將爲是100秒exectue:

from multiprocessing import Pool 

def timeout_f(arg): 
    pool = Pool(processes=1) 
    return pool.apply_async(f, [arg]).get(timeout=100) 

然後你的代碼更改爲:

result = list(executor.map(timeout_f, params)) 

或者,你可以寫您自己的線程/過程控制:

from multiprocessing import Process 
from time import time 

def chunks(l, n): 
    """ Yield successive n-sized chunks from l. """ 
    for i in xrange(0, len(l), n): 
     yield l[i:i+n] 

processes = [Process(target=f, args=(i,)) for i in params] 
exit_codes = [] 
for five_processes = chunks(processes, 5): 
    for p in five_processes: 
     p.start() 
    time_waited = 0 
    start = time() 
    for p in five_processes: 
     if time_waited >= 100: 
      p.join(0) 
      p.terminate() 
     p.join(100 - time_waited) 
     p.terminate() 
     time_waited = time() - start 
    for p in five_processes: 
     exit_codes.append(p.exit_code) 

您需要通過類似Can I get a return value from multiprocessing.Process?

得到返回值如果進程完成,進程的退出代碼爲0,如果它們已終止,進程的退出代碼爲非零。

技術來自: Join a group of python processes with a timeoutHow do you split a list into evenly sized chunks?


作爲另一種選擇,你可以只嘗試使用apply_async上multiprocessing.Pool

from multiprocessing import Pool, TimeoutError 
from time import sleep  

if __name__ == "__main__": 
    pool = Pool(processes=5) 
    processes = [pool.apply_async(f, [i]) for i in params] 
    results = [] 
    for process in processes: 
     try: 
      result.append(process.get(timeout=100)) 
     except TimeoutError as e: 
      results.append(e) 

注意上面可能等待超過100秒,每過程,就好像第一個過程需要50秒完成,第二個過程在其運行時間內將有50秒多餘的時間。更復雜的邏輯(如前面的例子)需要執行更嚴格的超時。

+0

第一種解決方案強制您等待100秒,即使所有進程最終在5秒內完成。您可能需要一個睡眠幾秒鐘的循環,然後檢查是否有任何進程仍在運行,何時返回睡眠狀態。 – dano 2014-09-22 15:59:50

+0

@dano是的,正在寫一個快速的答案。更新爲使用更好的邏輯 – Zags 2014-09-22 16:06:40

+0

它看起來像你做了某種複製/粘貼錯誤與您的編輯。縮進是關閉的,你調用'join'和'terminate'兩次。 – dano 2014-09-22 16:21:23