我正在使用芹菜做一個長期的任務。該任務將使用subprocess.Popen
創建子流程。爲了使任務夭折,我寫下面的代碼:在芹菜中,當工人即將關閉時如何中止運行任務?
from celery.contrib import abortable
@task(bind=True, base=abortable.AbortableTask)
def my_task(self, *args):
p = subprocess.Popen([...])
while True:
try:
p.wait(1)
except subprocess.TimeoutExpired:
if self.is_aborted():
p.terminate()
return
else:
break
# Other codes...
我試着在我的控制檯,它運作良好。但是當我決定通過按Ctrl+C
關閉工作人員時,程序打印出'worker: Warm shutdown (MainProcess)'
並且被阻塞了很長時間,這不是我所期望的。 工作人員即將關閉時似乎不會發生任務墮胎。
從我知道如果我想中止任務,我應該使用手動任務ID實例化一個AbortableAsyncResult
並調用其.abort()
方法的文檔。但我無法找到任何代碼,因爲它需要所有正在運行的任務的ID,而我沒有辦法訪問它。
那麼,如何在員工即將關閉時調用.abort()
執行所有正在運行的任務?或者有其他的選擇嗎?
我在Python 3.6.2中使用了芹菜4.1.0。
感謝您的回答。但是我很困惑如何獲得正在運行的任務列表。文檔說我應該使用'app.control.inspect().active()',但是它會一直返回'None',不管它放置哪個信號處理程序。 – hsfzxjy
你可以試試https://stackoverflow.com/questions/5544629/retrieve-list-of-tasks-in-a-queue-in-celery。如果這也行不通,也許你可以使用任務信號來維護一個正在運行的任務列表。 – Ishaan
它也不起作用...我剛剛閱讀了它的源代碼,發現在關閉期間檢查服務不可用。你有做這種任務的建議嗎? – hsfzxjy