2014-11-01 32 views
18

我會將多個任務添加到芹菜隊列並等待結果。我有多種想法,我將如何利用某種形式的共享存儲(memcached,redis,db等)來實現這一點,然而,我認爲這是Celery可以自動處理的東西,但我無法在線找到任何資源。將n個任務添加到芹菜隊列並等待結果

代碼示例

def do_tasks(b): 
    for a in b: 
     c.delay(a) 

    return c.all_results_some_how() 

回答

21

對於芹菜> = 3.0,taskset的是贊成的groupdeprecated

from celery import group 
from tasks import add 

job = group([ 
      add.s(2, 2), 
      add.s(4, 4), 
      add.s(8, 8), 
      add.s(16, 16), 
      add.s(32, 32), 
]) 

等待:

result = job.apply_async() 
result.join() 
16

Task.delay返回AsyncResult。使用AsyncResult.get可以獲得每項任務的結果。

要做到這一點,你需要保持對任務的引用。

def do_tasks(b): 
    tasks = [] 
    for a in b: 
     tasks.append(c.delay(a)) 
    return [t.get() for t in tasks] 

或者你可以使用ResultSet

def do_tasks(b): 
    rs = ResultSet([]) 
    for a in b: 
     rs.add(c.delay(a)) 
    return rs.get() 
+0

工作就像一個魅力保存'ResultSet'需要在它的構造函數的結果(或空列表)的列表。我向帖子提交了一個修改來糾正它。 – Prydie 2014-11-02 20:25:51

+1

@Prydie,感謝您的反饋和糾正。 – falsetru 2014-11-03 03:09:58

2

我有一種預感,你是不是真的想要的延遲,但芹菜的異步功能。

我覺得你真的想要一個TaskSet

from celery.task.sets import TaskSet 
from someapp.tasks import sometask 

def do_tasks(b): 
    job = TaskSet([sometask.subtask((a,)) for a in b]) 
    result = job.apply_async() 
    # might want to handle result.successful() == False 
    return result.join()