2012-07-16 80 views
13

我有一個芹菜鏈運行一些任務。每個任務都可能失敗並被重試。請參閱下面一個簡單的例子:重試芹菜失敗的任務是鏈條的一部分

from celery import task 

@task(ignore_result=True) 
def add(x, y, fail=True): 
    try: 
     if fail: 
      raise Exception('Ugly exception.') 
     print '%d + %d = %d' % (x, y, x+y) 
    except Exception as e: 
     raise add.retry(args=(x, y, False), exc=e, countdown=10) 

@task(ignore_result=True) 
def mul(x, y): 
    print '%d * %d = %d' % (x, y, x*y) 

和鏈條:

from celery.canvas import chain 
chain(add.si(1, 2), mul.si(3, 4)).apply_async() 

運行兩個任務(並假設沒有失敗),你會得到/看印刷:

1 + 2 = 3 
3 * 4 = 12 

但是,當添加任務第一次失敗並在後續的重試調用中成功時,鏈中的其餘任務不會運行,即添加任務失敗,鏈中的所有其他任務都不會運行,並且在af幾秒後,添加任務再次運行併成功,鏈中的其餘任務(本例中爲mul.si(3,4))不會運行。

芹菜是否提供了一種方法來繼續從失敗的任務中繼續失敗鏈?如果不是,那麼完成此操作的最佳方法是什麼,並確保鏈的任務按照指定的順序運行,並且只有在前一個任務成功執行後纔會執行,即使任務重試了幾次也是如此。

注1:問題可以通過做

add.delay(1, 2).get() 
mul.delay(3, 4).get() 

來解決,但我想了解爲何鏈不失敗的任務。

回答

0

我也有興趣瞭解爲什麼連鎖不能在失敗的任務中工作。

我挖一些芹菜代碼,以及到目前爲止,我發現是:

實施happends在app.builtins.py

@shared_task 
def add_chain_task(app): 
    from celery.canvas import chord, group, maybe_subtask 
    _app = app 

    class Chain(app.Task): 
     app = _app 
     name = 'celery.chain' 
     accept_magic_kwargs = False 

     def prepare_steps(self, args, tasks): 
      steps = deque(tasks) 
      next_step = prev_task = prev_res = None 
      tasks, results = [], [] 
      i = 0 
      while steps: 
       # First task get partial args from chain. 
       task = maybe_subtask(steps.popleft()) 
       task = task.clone() if i else task.clone(args) 
       i += 1 
       tid = task.options.get('task_id') 
       if tid is None: 
        tid = task.options['task_id'] = uuid() 
       res = task.type.AsyncResult(tid) 

       # automatically upgrade group(..) | s to chord(group, s) 
       if isinstance(task, group): 
        try: 
         next_step = steps.popleft() 
        except IndexError: 
         next_step = None 
       if next_step is not None: 
        task = chord(task, body=next_step, task_id=tid) 
       if prev_task: 
        # link previous task to this task. 
        prev_task.link(task) 
        # set the results parent attribute. 
        res.parent = prev_res 

       results.append(res) 
       tasks.append(task) 
       prev_task, prev_res = task, res 

      return tasks, results 

     def apply_async(self, args=(), kwargs={}, group_id=None, chord=None, 
       task_id=None, **options): 
      if self.app.conf.CELERY_ALWAYS_EAGER: 
       return self.apply(args, kwargs, **options) 
      options.pop('publisher', None) 
      tasks, results = self.prepare_steps(args, kwargs['tasks']) 
      result = results[-1] 
      if group_id: 
       tasks[-1].set(group_id=group_id) 
      if chord: 
       tasks[-1].set(chord=chord) 
      if task_id: 
       tasks[-1].set(task_id=task_id) 
       result = tasks[-1].type.AsyncResult(task_id) 
      tasks[0].apply_async() 
      return result 

     def apply(self, args=(), kwargs={}, **options): 
      tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']] 
      res = prev = None 
      for task in tasks: 
       res = task.apply((prev.get(),) if prev else()) 
       res.parent, prev = prev, res 
      return res 
    return Chain 

你可以看到,在年底prepare_stepsprev_task鏈接到下一個任務。 當prev_task失敗時,未調用下一個任務。

我與添加從上一個任務link_error到下一個測試:

if prev_task: 
    # link and link_error previous task to this task. 
    prev_task.link(task) 
    prev_task.link_error(task) 
    # set the results parent attribute. 
    res.parent = prev_res 

但隨後,接下來的任務,必須照顧這兩種情況下(也許當它配置爲不可變的,除了如不接受更多的論據)。

我覺得鏈可以通過允許一些語法的支持,喜歡這樣的:

c = chain(t1, (t2, t1e), (t3, t2e))

這意味着:

t1linkt2link_errort1e

t2linkt3link_errort2e

+0

我決定使用運行,否則將在鏈中的所有任務的連鎖任務,而是等待一個任務開始在另一個之前完成,如:'task1.delay([PARAMS])。得到(); 。task2.delay([PARAMS])得到(); task3.delay([PARAMS])。得到()'。鏈式任務可以捕獲任何任務引發的異常並重試自身。 – Andrei 2012-07-25 09:45:13

+0

因此,從你的例子中,t1e和t2e必須分別調用t2和t3,對吧? – Andrei 2012-07-25 09:51:17

+0

這個例子只是我對可能的鏈式語法的思考。這意味着每個接下來的任務,現在確實是對的任務,如果在上一步中沒有異常發生/錯誤對中第一個元素將被調用,第二個元素是上一步驟的失敗異常/錯誤處理程序。 't1e'的意思是't1錯誤處理程序' – anh 2012-07-25 10:01:54