2017-07-28 101 views
0

我正在創建一個自定義Celery任務類,以覆蓋在任務達到最大重試次數(on_failure)時發生的情況。如果任務失敗,我需要更新用戶模型的狀態。將參數傳遞給Celery任務的on_failure方法

下面是我的自定義任務類:

class ReadyTask(Task): 

    def run(self, user): 
     try: 
      user.get_results() 
     except Exception as exc: 
      raise self.retry(exc=exc, max_retries=3) 

    def on_failure(self, exc, task_id, *args, **kwargs): 
     user.status = Status.READY 
     user.save() 

如何傳遞用戶對象的on_failure()方法來更新其狀態?

回答

0

我相信你可以檢查你的argskwargs作爲用戶的id,如果你把它作爲參數發送給你的任務。如果你在kwargs中製作它,那麼你就不需要進行arg位置檢查。那麼只需從id中獲取用戶並進行更改?

因此,不要把它變成了run功能,而是作爲一個參數/關鍵字參數是您呼叫的任務功能,通過function.apply(kwargs),或function.apply_async(kwargs=kwargs)function.delay(kwargs)

所以:

user_id = kwargs.get('user_id')

# then resolve to user object, then update object

0

您也可以將對象綁定到你的自定義任務類。通過使用bind=True

class ReadyTask(Task): 

    def run(self, user): 

     self.user_object = user 

     try: 
      self.user_object.get_results() 
     except Exception as exc: 
      raise self.retry(exc=exc, max_retries=3) 

    def on_failure(self, exc, task_id, *args, **kwargs): 
     self.user_object.status = Status.READY 
     self.user_object.save() 

@app.task(base=ReadyTask, bind=True) 
def do_stuff(self, *args, **kwargs): 
    self.user_object.do_stuff()