2014-09-05 203 views
11

這應該是非常簡單的,我很驚訝,我一直無法找到已經在stackoverflow上回答的問題。多線程Python中的信號處理

我有一個程序需要響應SIGTERM和SIGINT信號的守護進程,以便與新貴們一起工作。我讀過這樣做的最好方法是在與主線程分離的線程中運行程序的主循環,並讓主線程處理信號。然後當收到一個信號時,信號處理程序應該通過設置一個常規在主循環中檢查的標記標誌來告訴主循環退出。

我試過這樣做,但它不按我期望的方式工作。請參見下面的代碼:

from threading import Thread 
import signal 
import time 
import sys 

stop_requested = False  

def sig_handler(signum, frame): 
    sys.stdout.write("handling signal: %s\n" % signum) 
    sys.stdout.flush() 

    global stop_requested 
    stop_requested = True  

def run(): 
    sys.stdout.write("run started\n") 
    sys.stdout.flush() 
    while not stop_requested: 
     time.sleep(2) 

    sys.stdout.write("run exited\n") 
    sys.stdout.flush() 

signal.signal(signal.SIGTERM, sig_handler) 
signal.signal(signal.SIGINT, sig_handler) 

t = Thread(target=run) 
t.start() 
t.join() 
sys.stdout.write("join completed\n") 
sys.stdout.flush() 

我測試了在以下兩個方面:

1)

$ python main.py > output.txt& 
[2] 3204 
$ kill -15 3204 

2)

$ python main.py 
ctrl+c 

在這兩種情況下,我希望這寫到輸出:

run started 
handling signal: 15 
run exited 
join completed 

在第一種情況,程序退出,但我看到的是:

run started 

在當按下CTRL + C和程序不退出SIGTERM信號被忽略貌似第二殼體。

我在這裏錯過了什麼?

+3

嘗試用'while t.is_alive():t.join(1)'替換't.join()'。您的主線程會每秒醒來檢查一次信號。 – roippi 2014-09-05 01:34:51

+2

更多閱讀:http://snakesthatbite.blogspot.com/2010/09/cpython-threading-interrupting.html – roippi 2014-09-05 01:37:25

回答

18

的問題是,如在解釋Execution of Python signal handlers

一個Python信號處理程序不獲取低級(C)信號處理器中執行。取而代之的是,低電平信號的處理程序設置一個標誌,它告訴虛擬機在稍後的點(在下一字節代碼指令例如),以執行相應的Python信號處理程序

...

長期運行的計算無論接收到任何信號,純粹以C語言實現(例如,在大量文本上進行正則表達式匹配)可以在任意時間內不間斷地運行。計算結束時將調用Python信號處理程序。

您的主線程被阻止在threading.Thread.join上,這最終意味着它被阻止在C調用pthread_join。當然,這不是一個「長時間運行的計算」,它是一個系統調用塊......但是,直到調用結束,你的信號處理程序不能運行。

而且,雖然在某些平臺上pthread_join會因信號而失敗,EINTR,但在其他平臺上則不會。在linux上,我相信這取決於你是選擇BSD風格還是默認的siginterrupt行爲,但默認是否定的。


那麼,你能做些什麼呢?

嗯,我敢肯定changes to signal handling in Python 3.3實際上改變了Linux的默認行爲,所以如果你升級你不需要做任何事情;只需運行在3.3+以下,並且您的代碼將按照您的預期工作。至少它適用於OS X上的CPython 3.4和Linux上的3.3。 (如果我對此有錯,我不確定它是否是CPython中的錯誤,因此您可能希望在python-list中提高它,而不是打開問題......)

另一方面, 3.3之前版本,signal模塊肯定不會公開自己需要的工具來解決這個問題。所以,如果你不能升級到3.3,那麼解決辦法就是等待一些可中斷的東西,比如ConditionEvent。子線程在退出之前通知事件,主線程在加入子線程之前等待事件。這絕對是不好的。我找不到任何能保證它有所作爲的東西;它恰好適用於我在OS X上的各種版本的CPython 2.7和3.2以及Linux上的2.6和2.7 ...

+0

「這絕對是個黑客」 - 我不會那麼說。在一個更高的抽象層次上同步你的線程,而不是簡單地使用'join'是明智的。如果你的目標是等待線程退出(就像這個特定的例子),那麼'join'就是正確的工具。如果您想等待工作量完成,那麼「條件」等就更有意義了。畢竟,工作負載可以在不會立即退出的池化線程中執行(例如)。 – 2018-02-04 08:13:33

8

abarnert的答案是現貨。但我仍然在使用Python 2.7。爲了解決這個問題,我寫了一個InterruptableThread類。

現在它不允許將其他參數傳遞給線程目標。連接也不接受超時參數。這只是因爲我不需要那樣做。你可以添加它,如果你想。如果你自己使用它,你可能會想要移除輸出語句。他們只是作爲評論和測試的一種方式。

import threading 
import signal 
import sys 

class InvalidOperationException(Exception): 
    pass  

# noinspection PyClassHasNoInit 
class GlobalInterruptableThreadHandler: 
    threads = [] 
    initialized = False 

    @staticmethod 
    def initialize(): 
     signal.signal(signal.SIGTERM, GlobalInterruptableThreadHandler.sig_handler) 
     signal.signal(signal.SIGINT, GlobalInterruptableThreadHandler.sig_handler) 
     GlobalInterruptableThreadHandler.initialized = True 

    @staticmethod 
    def add_thread(thread): 
     if threading.current_thread().name != 'MainThread': 
      raise InvalidOperationException("InterruptableThread objects may only be started from the Main thread.") 

     if not GlobalInterruptableThreadHandler.initialized: 
      GlobalInterruptableThreadHandler.initialize() 

     GlobalInterruptableThreadHandler.threads.append(thread) 

    @staticmethod 
    def sig_handler(signum, frame): 
     sys.stdout.write("handling signal: %s\n" % signum) 
     sys.stdout.flush() 

     for thread in GlobalInterruptableThreadHandler.threads: 
      thread.stop() 

     GlobalInterruptableThreadHandler.threads = []  

class InterruptableThread: 
    def __init__(self, target=None): 
     self.stop_requested = threading.Event() 
     self.t = threading.Thread(target=target, args=[self]) if target else threading.Thread(target=self.run) 

    def run(self): 
     pass 

    def start(self): 
     GlobalInterruptableThreadHandler.add_thread(self) 
     self.t.start() 

    def stop(self): 
     self.stop_requested.set() 

    def is_stop_requested(self): 
     return self.stop_requested.is_set() 

    def join(self): 
     try: 
      while self.t.is_alive(): 
       self.t.join(timeout=1) 
     except (KeyboardInterrupt, SystemExit): 
      self.stop_requested.set() 
      self.t.join() 

     sys.stdout.write("join completed\n") 
     sys.stdout.flush() 

該類可以使用兩種不同的方法。你可以分類InterruptableThread:

import time 
import sys 
from interruptable_thread import InterruptableThread 

class Foo(InterruptableThread): 
    def __init__(self): 
     InterruptableThread.__init__(self) 

    def run(self): 
     sys.stdout.write("run started\n") 
     sys.stdout.flush() 
     while not self.is_stop_requested(): 
      time.sleep(2) 

     sys.stdout.write("run exited\n") 
     sys.stdout.flush() 

sys.stdout.write("all exited\n") 
sys.stdout.flush() 

foo = Foo() 
foo2 = Foo() 
foo.start() 
foo2.start() 
foo.join() 
foo2.join() 

或者你可以使用它更像threading.thread工作方式。儘管運行方法必須將InterruptableThread對象作爲參數。

import time 
import sys 
from interruptable_thread import InterruptableThread 

def run(t): 
    sys.stdout.write("run started\n") 
    sys.stdout.flush() 
    while not t.is_stop_requested(): 
     time.sleep(2) 

    sys.stdout.write("run exited\n") 
    sys.stdout.flush() 

t1 = InterruptableThread(run) 
t2 = InterruptableThread(run) 
t1.start() 
t2.start() 
t1.join() 
t2.join() 

sys.stdout.write("all exited\n") 
sys.stdout.flush() 

用它做什麼你會。

1

我在這裏遇到了同樣的問題signal not handled when multiple threads join。在閱讀abarnert的回答後,我改爲使用Python 3並解決了這個問題。但我確實喜歡將我的所有程序都更改爲python 3.所以,我通過在發送信號之前避免調用線程join()來解決我的程序。以下是我的代碼。

這不是很好,但解決了我的程序在python 2.7。我的問題被標記爲重複,所以我把我的解決方案放在這裏。

import threading, signal, time, os 


RUNNING = True 
threads = [] 

def monitoring(tid, itemId=None, threshold=None): 
    global RUNNING 
    while(RUNNING): 
     print "PID=", os.getpid(), ";id=", tid 
     time.sleep(2) 
    print "Thread stopped:", tid 


def handler(signum, frame): 
    print "Signal is received:" + str(signum) 
    global RUNNING 
    RUNNING=False 
    #global threads 

if __name__ == '__main__': 
    signal.signal(signal.SIGUSR1, handler) 
    signal.signal(signal.SIGUSR2, handler) 
    signal.signal(signal.SIGALRM, handler) 
    signal.signal(signal.SIGINT, handler) 
    signal.signal(signal.SIGQUIT, handler) 

    print "Starting all threads..." 
    thread1 = threading.Thread(target=monitoring, args=(1,), kwargs={'itemId':'1', 'threshold':60}) 
    thread1.start() 
    threads.append(thread1) 
    thread2 = threading.Thread(target=monitoring, args=(2,), kwargs={'itemId':'2', 'threshold':60}) 
    thread2.start() 
    threads.append(thread2) 
    while(RUNNING): 
     print "Main program is sleeping." 
     time.sleep(30) 
    for thread in threads: 
     thread.join() 

    print "All threads stopped."