2011-07-05 101 views
6

我想要Celery任務取決於2個或更多其他任務的結果。我已經看過Python+Celery: Chaining jobs?http://pypi.python.org/pypi/celery-tasktree,但只有任務只有一項依賴任務時,這些纔是好的。依賴圖執行Celery任務

我知道TaskSet,但似乎沒有辦法在TaskSetResult.ready()變爲True時立即執行回調。我現在想的是有一個週期性任務,每隔幾毫秒左右輪詢TaskSetResult.ready(),並在回調函數觸發它,因爲它返回True,但對我來說聽起來相當不雅。

有什麼建議嗎?

回答

2

mrbox是真的,您可以重試,直到結果準備好,但在文檔中不太清楚,當您重試時必須通過setid和子任務元素,並且爲了恢復它必須使用映射函數,下面有一個示例代碼來解釋我的意思。

def run(self, setid=None, subtasks=None, **kwargs): 

    if not setid or not subtasks: 
     #Is the first time that I launch this task, I'm going to launch the subtasks 
     … 
     tasks = [] 
     for slice in slices: 
      tasks.append(uploadTrackSlice.subtask((slice,folder_name))) 

     job = TaskSet(tasks=tasks) 
     task_set_result = job.apply_async() 
     setid = task_set_result.taskset_id 
     subtasks = [result.task_id for result in task_set_result.subtasks] 
     self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) 

    #Is a retry than we just have to check the results   
    tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks)) 
    if not tasks_result.ready(): 
     self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) 
    else:  
     if tasks_result.successful(): 
      return tasks_result.join() 
     else: 
      raise Exception("Some of the tasks was failing") 
2

恕我直言,你可以做某事類同於docs- link

或者你可以使用重試方法做與MAX_RETRIES =無的東西 - 如果「基地」的任務。就緒一個()是假的,你可以fire .retry()方法,直到完成「基本」任務。

7

在最新版本的芹菜(3.0 +),可以使用所謂的和絃達到預期的效果:

http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives

簡單的和絃

和絃原語使我們能夠添加回調,當一個組中的任務的所有 都完成執行時,這通常是算法th所需的 在不尷尬的並行:

>>> from celery import chord 
>>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() 
>>> res.get() 
90 

聲明:我沒有嘗試這樣做我自己呢。

相關問題