2012-09-21 60 views
7

我在我的應用程序中使用celery來運行定期任務。讓我們來看看下面芹菜的回調apply_async

from myqueue import Queue 
@perodic_task(run_every=timedelta(minutes=1)) 
def process_queue(): 
    queue = Queue() 
    uid, questions = queue.pop() 
    if uid is None: 
     return 

    job = group(do_stuff(q) for q in questions) 
    job.apply_async() 

def do_stuff(question): 
    try: 
     ... 
    except: 
     ... 
     raise 

簡單的例子,你可以在上面的例子中看到的,我使用celery運行異步任務,但(因爲它是一個隊列)我需要做的queue.fail(uid)在例外的情況下,do_stuffqueue.ack(uid)否則。在這種情況下,在這兩種情況下,我的任務會有一些回調 - on_failureon_success,這將非常明確和有用。

我看到一些documentation,但從來沒有見過與apply_async使用回調的做法。有可能這樣做嗎?

回答

26

子類的任務類和重載on_success和ON_FAILURE功能:

class CallbackTask(Task): 
    def on_success(self, retval, task_id, args, kwargs): 
     pass 

    def on_failure(self, exc, task_id, args, kwargs, einfo): 
     pass 


@celery.task(base=CallbackTask) # this does the trick 
def add(x, y): 
    return x + y