2011-03-17 101 views
14

有沒有辦法確定任何任務是否丟失並重試?重試丟失或失敗的任務(芹菜,Django和RabbitMQ)

我認爲丟失的原因可能是調度程序錯誤或工作者線程崩潰。

我打算重試他們,但我不知道如何確定哪些任務需要重試?

以及如何自動完成此過程?我可以使用自己的自定義調度程序來創建新任務嗎?

編輯:我從文檔中發現RabbitMQ永遠不會鬆動任務,但是當工作線程在任務執行過程中崩潰時會發生什麼?

回答

26

你需要做的是設置

CELERY_ACKS_LATE =真

晚ACK意味着任務已經執行, 不僅僅是之前之後,這是默認行爲的任務消息將被確認。 這樣如果工作人員崩潰,兔子MQ仍然會有消息。

顯然總崩潰(兔子+工人)在同一時間沒有辦法恢復任務,除非您實施登錄任務開始和任務結束。 就我個人而言,每次任務開始時,我會在一行中寫入一行,當任務完成時(另一個任務結束時),通過分析mongo日誌我可以知道哪個任務被中斷。

您可以輕鬆地通過覆蓋芹菜基任務類的方法__call__after_return來完成。

以下您看到一段代碼,它使用taskLogger類作爲上下文管理器(具有入口和出口點)。 taskLogger類只是將包含任務信息的行寫入mongodb實例中。

def __call__(self, *args, **kwargs): 
    """In celery task this function call the run method, here you can 
    set some environment variable before the run of the task""" 

    #Inizialize context managers  

    self.taskLogger = TaskLogger(args, kwargs) 
    self.taskLogger.__enter__() 

    return self.run(*args, **kwargs) 

def after_return(self, status, retval, task_id, args, kwargs, einfo): 
    #exit point for context managers 
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo) 

我希望這能幫助