2012-10-10 45 views
11

我可以使用芹菜Group原始圖形作爲地圖/縮小工作流程中的傘形任務嗎?在地圖/縮小工作流程中使用的芹菜組任務

或更具體的說明:羣組中的子任務是否可以在多個服務器上的多個工作人員上運行

從文檔:

However, if you call apply_async on the group it will send a special 
grouping task, so that the action of calling the tasks happens in a worker 
instead of the current process 

這似乎暗示任務都發送給一個工人......

前3.0(仍然)人能斷火在taskset的子任務,其將運行在多個服務器上。問題是確定是否所有任務都已完成執行。這通常是通過輪詢所有不太優雅的子任務來完成的。 我想知道如果組元素可以用來緩解這個問題。

+0

至少在芹菜3.1中,通過常規的'group'命令完美分配任務,好像上述語句已從文檔中刪除 – Grozz

回答

23

我發現它可以使用和絃這樣的地圖減少像問題。

@celery.task(name='ic.mapper') 
def mapper(): 
    #split your problem in embarrassingly parallel maps 
    maps = [map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s()] 
    #and put them in a chord that executes them in parallel and after they finish calls 'reduce' 
    mapreduce = celery.chord(maps)(reduce.s())  
    return "{0} mapper ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname) 

@celery.task(name='ic.map') 
def map(): 
    #do something useful here 
    import time 
    time.sleep(10.0) 
    return "{0} map ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname) 

@celery.task(name='ic.reduce') 
def reduce(results): 
    #put the maps together and do something with the results 
    return "{0} reduce ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname) 

當三名工人/服務器集羣上執行映射器首先執行其中將您的問題映射器和創建再次提交給券商新的子任務。它們並行運行,因爲隊列被所有經紀人消費。此外,還會創建一個和絃任務,用於輪詢所有地圖以查看是否已完成。完成後,執行reduce任務,您可以將結果粘貼在一起。

總之:是的,這是可能的。感謝蔬菜傢伙!