2017-03-07 68 views
2

我有一個應用程序依靠超時信號來執行一些阻塞操作。Python線程中超時信號的替代方案

例如:

def wait_timeout(signum, frame): 
    raise Exception("timeout") 

signal.signal(signal.SIGALRM, wait_timeout) 
signal.setitimer(signal.ITIMER_REAL, 5) 

try: 
    while true: 
     print("zzz") 
     sleep(1) 
except Exception as e: 
    # timeout 
    print("Time's up") 

使用相同的方法現在,我已經實現了多線程,但所有的線程我得到ValueError: signal only works in main thread

我假設信號超時的方法不適用於線程。

不幸的是我不能使用這樣的事情:

timeout = 5 
start = time.time() 

while true: 
    print("zzz") 
    sleep(1) 
    if time.time() <= start+timeout: 
     print("Time's up) 
     break 

由於操作while循環可能會阻止並可能永遠持續下去,因此循環可能永遠達不到的,如果條款。

問:我該如何在線程中實現一個超時,就像我曾經用信號做過的那樣?

編輯:我碰到過this blog post,在python中顯示了類似的解決方案,用於JavaScript中的setTimeout()。我認爲這可能是一個可能的解決方案,但我真的不知道如何使用它。

EDIT2:我開始在主線程如下:

p = Popen(["tool", "--param", arg], stdin=PIPE, stdout=PIPE, stderr=STDOUT) 
t = Thread(target=process_thread, daemon=True, args=(p,arg1,arg2)) 
t.start() 

process_thread函數處理tool的標準輸出,通過執行以下操作:

for line in p.stdout: 
    # process line of the processes stdout 

該過程可以永遠拿走,例如一旦tool不產生任何輸出。我只想要tool的輸出,比方說5秒,所以for循環需要在特定超時後中斷。

這就是我使用的信號,但顯然他們不能在線程中工作。

edit3:我已經創建了一個更精細和準確的示例,說明如何在線程中使用信號。 See the gist here

+0

你是如何開始你的線程?當你定義它們時,你是否設置了'daemon = True'?如果是這樣,那麼當主線程死亡時,這些線程將被終止。那是你想要做什麼? – Billy

+0

是的,我以deamons開頭,我會在一分鐘內編輯OP。不,那不是我正在嘗試的,我會盡力在OP中更好地解釋它。 – SaAtomic

+0

我已經更新了OP @Billy – SaAtomic

回答

2

您要尋找的是看門狗

def watchdog(queue): 
    while True: 
     watch = queue.get() 
     time.sleep(watch.seconds) 

     try: 
      watch = queue.get_nowait() 
      # No except, got queue message, 
      # do noting wait for next watch 

     except queue.Empty: 
      os.kill(watch.pid, signal.SIGKILL) 

def workload_thread(queue): 
    pid = os.getpid() 
    queue.put({'pid':pid, 'seconds':5}) 

    # do your work 
    # Test Watchdog 
    # time.sleep(6) 

    queue.put({'pid':pid, 'done':True}) 

注:代碼沒有測試過,可能有語法錯誤!

+0

我覺得ike我以前見過/使用過這個或類似的東西,但我無法包裹我的現在就圍着它走。在研究看門狗主題時,我也發現[this](http://liveincode.blogspot.de/2012/11/watchdog-timer-in-python.html),但我仍然不知道從哪裏開始。你會如此善良,以我發佈的代碼創建一個例子嗎? – SaAtomic

1

這實現了class Terminator, ,它發送一個給定的timeout=5Threads Popen processsignal.SIG...。可能有多個不同的pid

class Terminator(object): 
    class WObj(): 
     def __init__(self, process, timeout=0, sig=signal.SIGABRT): 
      self.process = process 
      self.timeout = timeout 
      self.sig = sig 

    def __init__(self): 
     self.__queue = queue.Queue() 
     self.__t = Thread(target=self.__sigterm_thread, args=(self.__queue,)) 
     self.__t.start() 
     time.sleep(0.1) 

    def __sigterm_thread(self, q): 
     w = {} 
     t = 0 
     while True: 
      time.sleep(0.1); 
      t += 1 
      try: 
       p = q.get_nowait() 
       if p.process == 0 and p.sig == signal.SIGTERM: 
        # Terminate sigterm_thread 
        return 1 

       if p.process.pid not in w: 
        if p.timeout > 0 and p.sig != signal.SIGABRT: 
         w[p.process.pid] = p 
       else: 
        if p.sig == signal.SIGABRT: 
         del (w[p.process.pid]) 
        else: 
         w[p.process.pid].timeout = p.timeout 

      except queue.Empty: 
       pass 

      if t == 10: 
       for key in list(w.keys()): 
        p = w[key] 
        p.timeout -= 1 
        if p.timeout == 0: 
         """ A None value indicates that the process hasn't terminated yet. """ 
         if p.process.poll() == None: 
          p.process.send_signal(p.sig) 
         del (w[p.process.pid]) 
       t = 0 
      # end if t == 10 
     # end while True 

    def signal(self, process, timeout=0, sig=signal.SIGABRT): 
     self.__queue.put(self.WObj(process, timeout, sig)) 
     time.sleep(0.1) 

    def close(self, process): 
     self.__queue.put(self.WObj(process, 0, signal.SIGABRT)) 
     time.sleep(0.1) 

    def terminate(self): 
     while not self.__queue.empty(): 
      trash = self.__queue.get() 

     if self.__t.is_alive(): 
      self.__queue.put(self.WObj(0, 0, signal.SIGTERM)) 

    def __enter__(self): 
     return self 

    def __exit__(self, exc_type, exc_val, exc_tb): 
     self.__del__() 

    def __del__(self): 
     self.terminate() 

例如,這是工作量:

def workload(n, sigterm): 
    print('Start workload(%s)' % n) 
    arg = str(n) 
    p = Popen(["tool", "--param", arg], stdin=PIPE, stdout=PIPE, stderr=STDOUT) 

    sigterm.signal(p, timeout=4, sig=signal.SIGTERM) 
    while True: 
     for line in p.stdout: 
      # process line of the processes stdout 
      print(line.strip()) 
      time.sleep(1) 

     if p.poll() != None: 
      break 

    sigterm.close(p) 
    time.sleep(0.1) 
    print('Exit workload(%s)' % n) 

if __name__ == '__main__': 
    with Terminator() as sigterm: 
     p1 = Thread(target=workload, args=(1, sigterm)); p1.start(); time.sleep(0.1) 
     p2 = Thread(target=workload, args=(2, sigterm)); p2.start(); time.sleep(0.1) 
     p3 = Thread(target=workload, args=(3, sigterm)); p3.start(); time.sleep(0.1) 
     p1.join(); p2.join(); p3.join() 

     time.sleep(0.5) 
    print('EXIT __main__') 

測試使用Python 3.4.2和Python:2.7.9

+0

謝謝您的反饋和示例,但我認爲這不適用於多線程,如OP中所述。我在OP中添加了一個更詳細的代碼片段作爲鏈接。 – SaAtomic

+0

此外,如果我嘗試將該解決方案擴展到多個線程,則會遇到問題,即我的所有線程都報告相同的PID。 – SaAtomic

+1

是的,它不可擴展。在閱讀更詳細的代碼片段後,將回來一個scalabel解決方案。 – stovfl