2017-08-12 43 views
0

我正在使用芹菜做一個長期的任務。該任務將使用subprocess.Popen創建子流程。爲了使任務夭折,我寫下面的代碼:在芹菜中,當工人即將關閉時如何中止運行任務?

from celery.contrib import abortable 

@task(bind=True, base=abortable.AbortableTask) 
def my_task(self, *args): 
    p = subprocess.Popen([...]) 
    while True: 
     try: 
      p.wait(1) 
     except subprocess.TimeoutExpired: 
      if self.is_aborted(): 
       p.terminate() 
       return 
     else: 
      break 
    # Other codes... 

我試着在我的控制檯,它運作良好。但是當我決定通過按Ctrl+C關閉工作人員時,程序打印出'worker: Warm shutdown (MainProcess)'並且被阻塞了很長時間,這不是我所期望的。 工作人員即將關閉時似乎不會發生任務墮胎。

從我知道如果我想中止任務,我應該使用手動任務ID實例化一個AbortableAsyncResult並調用其.abort()方法的文檔。但我無法找到任何代碼,因爲它需要所有正在運行的任務的ID,而我沒有辦法訪問它。

那麼,如何在員工即將關閉時調用.abort()執行所有正在運行的任務?或者有其他的選擇嗎?

我在Python 3.6.2中使用了芹菜4.1.0。

回答

1

您可以使用worker signals來達到此目的。只需獲取所有正在運行的任務並在其上調用.abort()。

+0

感謝您的回答。但是我很困惑如何獲得正在運行的任務列表。文檔說我應該使用'app.control.inspect().active()',但是它會一直返回'None',不管它放置哪個信號處理程序。 – hsfzxjy

+0

你可以試試https://stackoverflow.com/questions/5544629/retrieve-list-of-tasks-in-a-queue-in-celery。如果這也行不通,也許你可以使用任務信號來維護一個正在運行的任務列表。 – Ishaan

+0

它也不起作用...我剛剛閱讀了它的源代碼,發現在關閉期間檢查服務不可用。你有做這種任務的建議嗎? – hsfzxjy

0

通過@Ishaan的回答啓發,我解決它通過自己,使用如下面的代碼:

def my_task(*args): 
    p = None 
    from celery.platforms import signals 

    def int_handler(signum, frame): 
     if p is not None: 
      p.kill() 
      p.wait() 

    signals['INT'] = int_handler 

    p = subprocess.Popen([...]) 
    p.wait() 
    # Other codes... 

的解決方案是基於以下的考慮:

  • 的內範圍該任務是我能找到的每個工作人員執行的唯一地點。
  • 在任務中我可以輕鬆訪問創建的子流程。
  • SIGINT信號不由芹菜工作人員處理,因此它不會覆蓋芹菜的默認行爲。
  • 一名工人一次只能執行一項任務,因此這種註冊是安全的。