2012-11-07 20 views
16

我想從Celery任務返回的列表中創建一個組,以便爲​​任務結果集中的每個項目添加一個任務。如何將返回列表的Celery任務鏈接到組中?

下面是一個簡單的代碼示例來解釋用例。 ???應該是上一個任務的結果。

@celery.task 
def get_list(amount): 
    # In reality, fetch a list of items from a db 
    return [i for i in range(amount)] 

@celery.task 
def process_item(item): 
    #do stuff 
    pass 

process_list = (get_list.s(10) | group(process_item.s(i) for i in ???)) 

我可能無法正確處理這個,但我敢肯定它不是安全地從任務中調用任務:

@celery.task 
def process_list(): 
    for i in get_list.delay().get(): 
     process_item.delay(i) 

我不從秒任務需要的結果。

+0

的確,不*從任務中調用任務。這會造成死鎖。假設你有一名工人。你把你的任務叫做工作1,然後調用第二個任務。沒有工人來處理這個任務,一切都會掛起。當你添加工作人員時,這種不潔現象會稍微好一些,但是你會一直在用一個任務捆綁多個工人(並且失去並行性)。 – mlissner

回答

29

您可以使用中間任務獲得此類行爲。下面是創建一個類似於您所建議的方法的「地圖」演示。

from celery import task, subtask, group 

@task 
def get_list(amount): 
    return [i for i in range(amount)] 

@task 
def process_item(item): 
    # do stuff 
    pass 

@task 
def dmap(it, callback): 
    # Map a callback over an iterator and return as a group 
    callback = subtask(callback) 
    return group(callback.clone([arg,]) for arg in it)() 

# runs process_item for each item in the return of get_list 
process_list = (get_list.s(10) | dmap.s(process_item.s())) 

信貸要問索利姆給我這個建議時,我請他幫忙在一個類似的問題。

+1

請注意,克隆只會執行淺拷貝。如果你想克隆一個「複雜」簽名(比如鏈,組或者和絃),你需要(ab)使用python的deepcopy,如[celery issue 2251](https://github.com/celery) /芹菜/問題/ 2251)。或者你將'callback = subtask(callback)'移動到for循環中創建函數並刪除'clone'。 –

+0

我已閱讀了上述評論十幾次,但我不明白。你能否提供一個例子,@LuisNell? – mlissner

+0

@mlissner鑑於上述代碼,我的意思是以下內容。如果我們假設「回調」不只是一個單一的任務,而是一個複雜的工作流程(一個組或一個和絃),你不能簡單地使用'.clone()'。小組和和絃可能非常複雜(一組小組等)。在這種情況下,您不能簡單地使用'.clone',因爲它只會創建回調簽名的淺表副本。這意味着參數不會被正確傳遞。爲了確保一切按預期工作,您需要使用'deepcopy',就像我原來的評論中提到的那樣 - 是否更加清楚?如果沒有,我會再試一次。 –

相關問題