2015-10-26 71 views
0

我知道我可以return,但我想知道是否有其他的東西,尤其是對於輔助方法,其中return None會強制調用者在每次調用時添加樣板檢查的任務。從裏面取消芹菜任務?

我發現InvalidTaskError,但沒有真正的文件 - 這是一個內部的東西嗎?提出這個問題是否合適?

我正在尋找類似self.abort()的東西,類似於self.retry(),但沒有看到任何東西。

下面是我使用它的一個例子。

def helper(task, arg): 
    if unrecoverable_problems(arg): 
     # abort the task 
     raise InvalidTaskError() 

@task(bind=True) 
task_a(self, arg): 
    helper(task=self, arg=arg) 
    do_a(arg) 

@task(bind=True) 
task_b(self, arg): 
    helper(task=self, arg=arg) 
    do_b(arg) 

回答

0

在做了更多的挖掘之後,我找到了一個使用Reject的例子;

(從文檔頁面複製)

該任務可以提高拒絕到拒絕使用AMQPs basic_reject方法任務消息。除非啓用Task.acks_late,否則這將不起作用。

拒絕消息與確認消息具有相同的效果,但某些代理可能會實現可以使用的其他功能。例如RabbitMQ支持死信交換的概念,其中隊列可以配置爲使用被拒絕的消息被重新發送到的死信交換。

拒絕也可用於重新發送消息,但在使用此消息時請務必小心,因爲它很容易導致無限的消息循環。

實施例使用拒絕當任務導致內存不足:

import errno 
from celery.exceptions import Reject 

@app.task(bind=True, acks_late=True) 
def render_scene(self, path): 
    file = get_file(path) 
    try: 
     renderer.render_scene(file) 

    # if the file is too big to fit in memory 
    # we reject it so that it's redelivered to the dead letter exchange 
    # and we can manually inspect the situation. 
    except MemoryError as exc: 
     raise Reject(exc, requeue=False) 
    except OSError as exc: 
     if exc.errno == errno.ENOMEM: 
      raise Reject(exc, requeue=False) 

    # For any other error we retry after 10 seconds. 
    except Exception as exc: 
     raise self.retry(exc, countdown=10)