2013-02-27 32 views
8

使用Python RQ,我們試圖動態管理工作進程。我們使用一個定製的工人腳本,該腳本(以簡化形式)如下:如何正確關閉Python RQ工作進程動態?

from rq import Connection, Worker 

queues_to_listen_on = get_queues_to_listen_on() 

with Connection(connection = get_worker_connection()): 
    w = Worker(queues_to_listen_on) 
    w.work() 

我們是在工人的關閉特別感興趣。我們主要關心的是如何正常關閉工作人員,以便在關閉之前完成當前工作。在適當的Worker對象上的request_stop(...)信號處理程序似乎正在做我們需要的東西,但似乎沒有辦法(至少據我所知)發射它,除非它是通過在終端中運行的工作進程上按CTRL+C

在我看來,有兩個可能的解決方案(也可以肯定是更多) - 按優先順序排列:

  1. 編程,使用rq庫,將信號發送到request_stop,從而觸發正常關閉。
  2. 以某種方式獲取正確進程的PID(不確定主進程或工作進程偵聽器進程)並使用其他方法將適當的信號發送到該進程。我們有一些方法可以完成,但它很可能需要更多的工作,並引入其他變量來解決我寧願被忽略的問題(例如,使用Fabric來運行遠程命令或沿着這些命令行的東西)。

如果有更好的方法來解決這個問題或者一個不同的替代方案來實現相同的目標,我將不勝感激您的建議。

+0

如果你需要PID,你實際上可以從w.pid中得到它 – Borys 2013-02-27 19:11:21

回答

4

選項1在設計方面絕對更好。

但是解決不必使用CTRL + C退出過程(我討厭太)的您的特定問題,您可以使用下面的策略爲你的員工:

# WORKER_NAME.py 
import os 

PID = os.getpid() 

@atexit.register 
def clean_shut(): 
    print "Clean shut performed" 

    try: 
     os.unlink("WORKER_NAME.%d" % PID) 
    except: 
     pass 

# Worker main 
def main(): 
    f = open("WORKER_NAME.%d" % PID, "w") 
    f.write("Delete this to end WORKER_NAME gracefully") 
    f.close() 

    while os.path.exists("WORKER_NAME.%d" % PID): 
     # Worker working 

而在你的主腳本,得到工作人員PID作爲@Borys建議,發送熱烈的停止請求,並os.unlink("path/to/WORKER_NAME.%d" % worker_PID)以確保正常關機:)

這隻適用於運行無限循環的工人。如果工作進程調用阻塞連續的一次性工作的事情,則必須進一步追蹤可能的阻塞例程以從那裏解析,例如應用某種超時策略。