2017-08-29 44 views
0

我需要以下流程:芹菜 - 一個任務運行N個任務,等待他們和處理結果

  • ParentTask運行第一
  • 在某些時候,它產生的ChildTask N個實例,其運行在平行
  • ParentTask等待那些完成,收集結果,莫名其妙地處理它們,並完成

這似乎是很容易的。不幸的是,從任務中調用Task().delay()(我用它來調用任務)似乎完全被忽略。我完全迷失在這裏。

如果你喜歡代碼的方法更多,我也包括它。

from celery.task import Task 
from celery.result import AsyncResult 

class ParentTask(Task): 
    def run(self, *args, **kwargs): 
     # do some stuff 
     ids = [ChildTask().delay().id for _ in range(N)] # this seems to do nothing here 
     results = [AsyncResult(t) for t in ids] 
     while not all([r.ready() for r in results]): # wait for child tasks to finish 
      sleep(.100) 
     # do some stuff again 
     # return results 

class ChildTask(Task): 
    def run(self, *args, **kwargs): 
     # do some child stuff 
     # return child results 

ParentTask().delay() # this delay works fine 

感謝您的任何線索!

+0

您需要[Canvas](http://docs.celeryproject.org/en/latest/userguide/canvas.html#groups) –

+0

我可以將這些組,鏈,和絃等組合在一起嗎?如果是這樣,你能否給我寫一段代碼片段來說明使用情況? – karlosss

回答

0

好吧,我明白了。工作方式可以像這樣(當然,任務可以做任何需要的東西):

from time import sleep 
from celery.task import Task 
from celery import chain, group 

class PreTask(Task): 
    def run(self, *args, **kwargs): 
     x = 0 
     for i in range(100000): 
      x += 1 
     return x 


class MidTask(Task): 
    def run(self, *args, **kwargs): 
     sleep(5) 
     return 42 


class PostTask(Task): 
    def run(self, *args, **kwargs): 
     return args 


# call it like this 
res = chain(PreTask().s() | group(MidTask().s() for _ in range(5)) | PostTask().s()).apply_async() 

# and get the result for example like this 
while(True): 
    if res.ready(): 
     print(res.get()) 
    sleep(1) 

希望它可以幫助別人。