2011-06-27 92 views
17

我試圖異步使用Web服務,因爲它需要長達45秒才能返回。不幸的是,這個網絡服務也有點不可靠,可能會引發錯誤。我已經設置了django-celery並執行了我的任務,這可以正常工作,直到任務失敗超出max_retries從任務恢復失敗超出max_retries

這是我到目前爲止有:

@task(default_retry_delay=5, max_retries=10) 
def request(xml): 
    try: 
     server = Client('https://www.whatever.net/RealTimeService.asmx?wsdl') 
     xml = server.service.RunRealTimeXML(
      username=settings.WS_USERNAME, 
      password=settings.WS_PASSWORD, 
      xml=xml 
     ) 
    except Exception, e: 
     result = Result(celery_id=request.request.id, details=e.reason, status="i") 
     result.save() 
     try: 
      return request.retry(exc=e) 
     except MaxRetriesExceededError, e: 
      result = Result(celery_id=request.request.id, details="Max Retries Exceeded", status="f") 
      result.save() 
      raise 
    result = Result(celery_id=request.request.id, details=xml, status="s") 
    result.save() 
    return result 

不幸的是,MaxRetriesExceededError沒有被retry()拋出,所以我不知道如何處理這種任務的失敗。 Django已經將HTML返回給客戶端,並且我正在通過AJAX檢查Result的內容,該AJAX永遠不會完全失敗f狀態。

所以問題是:當芹菜任務超過max_retries時,如何更新我的數據庫?

回答

14

您可以覆蓋芹菜任務類的after_return方法,這種方法無論是退役狀態的任務執行後調用(成功,失敗,重試)

class MyTask(celery.task.Task) 

    def run(self, xml, **kwargs) 
     #Your stuffs here 

    def after_return(self, status, retval, task_id, args, kwargs, einfo=None): 
     if self.max_retries == int(kwargs['task_retries']): 
      #If max retries are equals to task retries do something 
     if status == "FAILURE": 
      #You can do also something if the tasks fail instead of check the retries 

http://readthedocs.org/docs/celery/en/latest/reference/celery.task.base.html#celery.task.base.BaseTask.after_return

http://celery.readthedocs.org/en/latest/reference/celery.app.task.html?highlight=after_return#celery.app.task.Task.after_return

+0

由於鏈接顯然已經過時了,現在,[這裏是一個新的](http://celery.readthedocs.org/en/latest/reference/celery.app.task.html?highlight=after_return#celery.app.task.Task.after_return) – rschwieb

+0

謝謝,回答更新。 –

15

芹菜2.3.2版本這種方法行之有效,我:

class MyTask(celery.task.Task): 
    abstract = True 

    def after_return(self, status, retval, task_id, args, kwargs, einfo): 
     if self.max_retries == self.request.retries: 
      #If max retries is equal to task retries do something 

@task(base=MyTask, default_retry_delay=5, max_retries=10) 
def request(xml): 
    #Your stuff here 
6

我現在就和這個一起,爲我省去了繼任任務的工作,並且很容易理解。

# auto-retry with delay as defined below. After that, hook is disabled. 
@celery.shared_task(bind=True, max_retries=5, default_retry_delay=300) 
def post_data(self, hook_object_id, url, event, payload): 
    headers = {'Content-type': 'application/json'} 
    try: 
     r = requests.post(url, data=payload, headers=headers) 
     r.raise_for_status() 
    except requests.exceptions.RequestException as e: 
     if self.request.retries >= self.max_retries: 
      log.warning("Auto-deactivating webhook %s for event %s", hook_object_id, event) 
      Webhook.objects.filter(object_id=hook_object_id).update(active=False) 
      return False 
     raise self.retry(exc=e) 
    return True 
8

這個問題是,芹菜試圖重新提高您在通過重試限制時傳入的異常。這樣做的再認識的代碼是在這裏:https://github.com/celery/celery/blob/v3.1.20/celery/app/task.py#L673-L681

解決這個問題的最簡單的方法是隻沒有芹菜所有管理例外情況:

@task(max_retries=10) 
def mytask(): 
    try: 
     do_the_thing() 
    except Exception as e: 
     try: 
      mytask.retry() 
     except MaxRetriesExceededError: 
      do_something_to_handle_the_error() 
      logger.exception(e) 
+1

這是原始問題的正確解決方案。 –