有沒有辦法確定任何任務是否丟失並重試?重試丟失或失敗的任務(芹菜,Django和RabbitMQ)
我認爲丟失的原因可能是調度程序錯誤或工作者線程崩潰。
我打算重試他們,但我不知道如何確定哪些任務需要重試?
以及如何自動完成此過程?我可以使用自己的自定義調度程序來創建新任務嗎?
編輯:我從文檔中發現RabbitMQ永遠不會鬆動任務,但是當工作線程在任務執行過程中崩潰時會發生什麼?
有沒有辦法確定任何任務是否丟失並重試?重試丟失或失敗的任務(芹菜,Django和RabbitMQ)
我認爲丟失的原因可能是調度程序錯誤或工作者線程崩潰。
我打算重試他們,但我不知道如何確定哪些任務需要重試?
以及如何自動完成此過程?我可以使用自己的自定義調度程序來創建新任務嗎?
編輯:我從文檔中發現RabbitMQ永遠不會鬆動任務,但是當工作線程在任務執行過程中崩潰時會發生什麼?
你需要做的是設置
CELERY_ACKS_LATE =真
晚ACK意味着任務已經執行, 不僅僅是之前之後,這是默認行爲的任務消息將被確認。 這樣如果工作人員崩潰,兔子MQ仍然會有消息。
顯然總崩潰(兔子+工人)在同一時間沒有辦法恢復任務,除非您實施登錄任務開始和任務結束。 就我個人而言,每次任務開始時,我會在一行中寫入一行,當任務完成時(另一個任務結束時),通過分析mongo日誌我可以知道哪個任務被中斷。
您可以輕鬆地通過覆蓋芹菜基任務類的方法__call__
和after_return
來完成。
以下您看到一段代碼,它使用taskLogger類作爲上下文管理器(具有入口和出口點)。 taskLogger類只是將包含任務信息的行寫入mongodb實例中。
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
#Inizialize context managers
self.taskLogger = TaskLogger(args, kwargs)
self.taskLogger.__enter__()
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point for context managers
self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)
我希望這能幫助