2017-08-31 18 views
0

我遇到與下面這種芹菜工作流程的一些非常奇怪的行爲鏈:芹菜帆布組傳遞參數太多組成任務

workflow = group(
    chain(task1.s(), task2.s()), 
    chain(task3.s(), task4.s()), 
) 

這是在Django的背景下。

當我調用工作流程如下:

workflow.apply_async((n,))

...對於n的任意整數值,在每個鏈(task1task3)第一個任務將會失敗,像以下一個TypeError (從celery events拍攝):

args: [9, 8, 7, 5, 4, 3] 
    kwargs: {} 
    retries: 0 
    exception: TypeError('task1() takes exactly 1 argument (6 given)',) 
    state: FAILURE 

後的第一個參數是始終工作流之前用所謂的論據。所以,在這個例子中,我在這個時候調用了workflow.apply_async((9,)),其他的數字是以前通過的值。每一次,傳遞給task1task3的惡意論據都是一樣的。

我很想試試把它作爲一個錯誤報告發給芹菜,但我還不確定這個錯誤是不是我的。

事情我已經排除了:

  • 我肯定傳遞參數我想我傳遞給workflow.apply_async。我已經單獨構建並記錄了我傳遞的元組,以確保這一點。
  • 這與將一個列表(即可變)傳遞給apply_async而不是元組無關。我肯定會傳遞一個元組(即不可變)。

的只有適度的不尋常的事情對我的設置,雖然我不能看到它是如何連接的,它是task1task3與不同的隊列配置。

回答

0

曾經碰到過類似的問題,當我與芹菜task.chunks工作()

我具有包含到一個單一的元組的產品清單解決它。例如,

假設任務log_i()是shared_task基本上記錄變量i,我希望通過分塊我會做記錄所有i s的名單 -

# log_i shared Task 
@shared_task(bind=True) 
def log_i(self, i): 
    logger.info(i) 
    return i 

而且

# some calling site 
# ... 
very_long_list = [{"i1": i, "i2": i+i, "i3": i**2} for i in range(1000)] 
res = log_i.chunks(zip(very_long_list), 10)() 
print(res.get()) 

# ... 

自我提醒,這樣做的東西,如 -

# ... 
res = log_i.chunks(very_long_list, 10)() 
# ... 

將失敗,並顯示錯誤信息,當列表中的項目不是可迭代項目時。

壓縮將項目按原樣移動到新元組中,通過此功能,您可以將其捕獲到log_i任務中的單個參數中。