1

我想使用multiprocessing.Pool,但multiprocessing.Pool超時後無法中止任務。我發現solution和一些修改它。python多處理池超時

from multiprocessing import util, Pool, TimeoutError 
from multiprocessing.dummy import Pool as ThreadPool 
import threading 
import sys 
from functools import partial 
import time 


def worker(y): 
    print("worker sleep {} sec, thread: {}".format(y, threading.current_thread())) 
    start = time.time() 
    while True: 
     if time.time() - start >= y: 
      break 
     time.sleep(0.5) 
     # show work progress 
     print(y) 
    return y 


def collect_my_result(result): 
    print("Got result {}".format(result)) 


def abortable_worker(func, *args, **kwargs): 
    timeout = kwargs.get('timeout', None) 
    p = ThreadPool(1) 
    res = p.apply_async(func, args=args) 
    try: 
     # Wait timeout seconds for func to complete. 
     out = res.get(timeout) 
    except TimeoutError: 
     print("Aborting due to timeout {}".format(args[1])) 
     # kill worker itself when get TimeoutError 
     sys.exit(1) 
    else: 
     return out 


def empty_func(): 
    pass 


if __name__ == "__main__": 
    TIMEOUT = 4 
    util.log_to_stderr(util.DEBUG) 
    pool = Pool(processes=4) 

    # k - time to job sleep 
    featureClass = [(k,) for k in range(20, 0, -1)] # list of arguments 
    for f in featureClass: 
     # check available worker 
     pool.apply(empty_func) 

     # run job with timeout 
     abortable_func = partial(abortable_worker, worker, timeout=TIMEOUT) 
     pool.apply_async(abortable_func, args=f, callback=collect_my_result) 

    time.sleep(TIMEOUT) 
    pool.terminate() 
    print("exit") 

主要修改 - 工作進程出口與sys.exit(1)。這是殺死工人進程和殺死工作線程,但我不確定這個解決方案是否好。當進程終止與運行作業時,我可以得到什麼潛在的問題?

+0

好的。我想你最好在你的worker中處理超時()並將結果寫入一個通用集合。這樣,您只需要在所有線程上調用join(),然後處理結果。如果你的系統負載不重,那麼事情應該是正常的。 – mljli

回答

4

有在停止運行的作業沒有隱含的風險,該操作系統將採取的正確終止進程的照顧。

如果你的工作是寫在文件上,你的磁盤上可能會有很多被截斷的文件。

如果您在數據庫上編寫或者您使用某個遠程進程連接,也可能發生一些小問題。

儘管如此,Python標準池不支持超時和終止進程突然可能導致您的應用程序內怪異的行爲。

Pebble處理池支持超時任務。

from pebble import process, TimeoutError 

with process.Pool() as pool: 
    task = pool.schedule(function, args=[1,2], timeout=5) 

    try: 
     result = task.get() 
    except TimeoutError: 
     print "Task: %s took more than 5 seconds to complete" % task 
+0

它看起來不錯。你現在有沒有在生產中使用它的成功案例? – rusnasonov

+0

不知道我理解正確。你需要Pebble在生產或系統殺死過程方面的成功案例? Pebble是一個相當穩定的圖書館,擁有相當數量的下載量(http://pypi-ranking.info/module/Pebble)。 – noxdafox

+0

是的,你理解正確。你知道使用peeble的項目嗎? – rusnasonov