2016-02-03 85 views
1

我想找到方法來啓動一個新的進程,並得到它的輸出,如果它需要少於X秒。如果過程需要更多時間,我想忽略過程結果,殺死過程並繼續。啓動python進程輸出和超時

我需要基本上添加計時器到下面的代碼。現在確定是否有更好的方法來做到這一點,我打開了一個不同的更好的解決方案。

from multiprocessing import Process, Queue 

def f(q): 
    # Ugly work 
    q.put(['hello', 'world']) 

if __name__ == '__main__': 
    q = Queue() 
    p = Process(target=f, args=(q,)) 
    p.start() 
    print q.get() 
    p.join() 

謝謝!

+0

請參閱[超時在Python函數調用](http://stackoverflow.com/q/492519/4279)具體[此答案](http://stackoverflow.com/a/14924210/4279) – jfs

回答

1

您可能會發現下面的模塊在你的情況下非常有用:

模塊

#! /usr/bin/env python3 
"""Allow functions to be wrapped in a timeout API. 

Since code can take a long time to run and may need to terminate before 
finishing, this module provides a set_timeout decorator to wrap functions.""" 

__author__ = 'Stephen "Zero" Chappell ' \ 
      '<[email protected]>' 
__date__ = '18 December 2017' 
__version__ = 1, 0, 1 
__all__ = [ 
    'set_timeout', 
    'run_with_timeout' 
] 

import multiprocessing 
import sys 
import time 

DEFAULT_TIMEOUT = 60 


def set_timeout(limit=None): 
    """Return a wrapper that provides a timeout API for callers.""" 
    if limit is None: 
     limit = DEFAULT_TIMEOUT 
    _Timeout.validate_limit(limit) 

    def wrapper(entry_point): 
     return _Timeout(entry_point, limit) 

    return wrapper 


def run_with_timeout(limit, polling_interval, entry_point, *args, **kwargs): 
    """Execute a callable object and automatically poll for results.""" 
    engine = set_timeout(limit)(entry_point) 
    engine(*args, **kwargs) 
    while engine.ready is False: 
     time.sleep(polling_interval) 
    return engine.value 


def _target(queue, entry_point, *args, **kwargs): 
    """Help with multiprocessing calls by being a top-level module function.""" 
    # noinspection PyPep8,PyBroadException 
    try: 
     queue.put((True, entry_point(*args, **kwargs))) 
    except: 
     queue.put((False, sys.exc_info()[1])) 


class _Timeout: 
    """_Timeout(entry_point, limit) -> _Timeout instance""" 

    def __init__(self, entry_point, limit): 
     """Initialize the _Timeout instance will all needed attributes.""" 
     self.__entry_point = entry_point 
     self.__limit = limit 
     self.__queue = multiprocessing.Queue() 
     self.__process = multiprocessing.Process() 
     self.__timeout = time.monotonic() 

    def __call__(self, *args, **kwargs): 
     """Begin execution of the entry point in a separate process.""" 
     self.cancel() 
     self.__queue = multiprocessing.Queue(1) 
     self.__process = multiprocessing.Process(
      target=_target, 
      args=(self.__queue, self.__entry_point) + args, 
      kwargs=kwargs 
     ) 
     self.__process.daemon = True 
     self.__process.start() 
     self.__timeout = time.monotonic() + self.__limit 

    def cancel(self): 
     """Terminate execution if possible.""" 
     if self.__process.is_alive(): 
      self.__process.terminate() 

    @property 
    def ready(self): 
     """Property letting callers know if a returned value is available.""" 
     if self.__queue.full(): 
      return True 
     elif not self.__queue.empty(): 
      return True 
     elif self.__timeout < time.monotonic(): 
      self.cancel() 
     else: 
      return False 

    @property 
    def value(self): 
     """Property that retrieves a returned value if available.""" 
     if self.ready is True: 
      valid, value = self.__queue.get() 
      if valid: 
       return value 
      raise value 
     raise TimeoutError('execution timed out before terminating') 

    @property 
    def limit(self): 
     """Property controlling what the timeout period is in seconds.""" 
     return self.__limit 

    @limit.setter 
    def limit(self, value): 
     self.validate_limit(value) 
     self.__limit = value 

    @staticmethod 
    def validate_limit(value): 
     """Verify that the limit's value is not too low.""" 
     if value <= 0: 
      raise ValueError('limit must be greater than zero') 

要使用,請看下面的例子演示其用法:

from time import sleep 


def main(): 
    timeout_after_four_seconds = timeout(4) 
    # create copies of a function that have a timeout 
    a = timeout_after_four_seconds(do_something) 
    b = timeout_after_four_seconds(do_something) 
    c = timeout_after_four_seconds(do_something) 
    # execute the functions in separate processes 
    a('Hello', 1) 
    b('World', 5) 
    c('Jacob', 3) 
    # poll the functions to find out what they returned 
    results = [a, b, c] 
    polling = set(results) 
    while polling: 
     for process, name in zip(results, 'abc'): 
      if process in polling: 
       ready = process.ready 
       if ready is True:  # if the function returned 
        print(name, 'returned', process.value) 
        polling.remove(process) 
       elif ready is None:  # if the function took too long 
        print(name, 'reached timeout') 
        polling.remove(process) 
       else:     # if the function is running 
        assert ready is False, 'ready must be True, False, or None' 
     sleep(0.1) 
    print('Done.') 


def do_something(data, work): 
    sleep(work) 
    print(data) 
    return work 


if __name__ == '__main__': 
    main() 
+1

不得不更改clock()的日期時間,因爲它運行在樹莓派上,並且原始代碼工作不正常。 謝謝! – MrCatacroquer

0

您正在運行的進程是否包含循環? 如果是這樣,您可以在啓動循環之前獲取時間戳,並在循環中包含一個if語句並使用sys.exit();如果當前時間戳與記錄的開始時間標記相差超過x秒,則命令終止腳本。

0

所有你需要適應the queue example from the docs到你的情況是超時傳遞給q.get()通話和超時終止進程:

from Queue import Empty 
... 

try: 
    print q.get(timeout=timeout) 
except Empty: # no value, timeout occured 
    p.terminate() 
    q = None # the queue might be corrupted after the `terminate()` call 
p.join() 

使用Pipe可能更輕巧,否則代碼是相同的(您可以使用.poll(timeout)來確定是否有數據可以接收)。

相關問題