我會將多個任務添加到芹菜隊列並等待結果。我有多種想法,我將如何利用某種形式的共享存儲(memcached,redis,db等)來實現這一點,然而,我認爲這是Celery可以自動處理的東西,但我無法在線找到任何資源。將n個任務添加到芹菜隊列並等待結果
代碼示例
def do_tasks(b):
for a in b:
c.delay(a)
return c.all_results_some_how()
我會將多個任務添加到芹菜隊列並等待結果。我有多種想法,我將如何利用某種形式的共享存儲(memcached,redis,db等)來實現這一點,然而,我認爲這是Celery可以自動處理的東西,但我無法在線找到任何資源。將n個任務添加到芹菜隊列並等待結果
代碼示例
def do_tasks(b):
for a in b:
c.delay(a)
return c.all_results_some_how()
對於芹菜> = 3.0,taskset的是贊成的groupdeprecated。
from celery import group
from tasks import add
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8),
add.s(16, 16),
add.s(32, 32),
])
等待:
result = job.apply_async()
result.join()
Task.delay
返回AsyncResult
。使用AsyncResult.get
可以獲得每項任務的結果。
要做到這一點,你需要保持對任務的引用。
def do_tasks(b):
tasks = []
for a in b:
tasks.append(c.delay(a))
return [t.get() for t in tasks]
或者你可以使用ResultSet
:
def do_tasks(b):
rs = ResultSet([])
for a in b:
rs.add(c.delay(a))
return rs.get()
我有一種預感,你是不是真的想要的延遲,但芹菜的異步功能。
我覺得你真的想要一個TaskSet:
from celery.task.sets import TaskSet
from someapp.tasks import sometask
def do_tasks(b):
job = TaskSet([sometask.subtask((a,)) for a in b])
result = job.apply_async()
# might want to handle result.successful() == False
return result.join()
工作就像一個魅力保存'ResultSet'需要在它的構造函數的結果(或空列表)的列表。我向帖子提交了一個修改來糾正它。 – Prydie 2014-11-02 20:25:51
@Prydie,感謝您的反饋和糾正。 – falsetru 2014-11-03 03:09:58