當我路線的任務,它的工作原理特定隊列:如何將任務鏈路由到芹菜中的特定隊列?
task.apply_async(queue='beetroot')
但是,如果我創建了一個鏈條:
chain = task | task
然後我寫
chain.apply_async(queue='beetroot')
它似乎忽略隊列關鍵字並分配給默認的「芹菜」隊列。
這將是很好,如果芹菜支持鏈路由 - 在同一個隊列順序執行的所有任務。
當我路線的任務,它的工作原理特定隊列:如何將任務鏈路由到芹菜中的特定隊列?
task.apply_async(queue='beetroot')
但是,如果我創建了一個鏈條:
chain = task | task
然後我寫
chain.apply_async(queue='beetroot')
它似乎忽略隊列關鍵字並分配給默認的「芹菜」隊列。
這將是很好,如果芹菜支持鏈路由 - 在同一個隊列順序執行的所有任務。
好吧我得到這個想通了。
您必須加入像隊列=或倒計時所需執行選項=給子任務的定義,或通過部分:
子任務定義:
from celery import subtask
chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot')
部分:
chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot')
chain.apply_async()
:
然後你通過執行鏈
,或者
chain.delay()
這些任務將被髮送到 '甜菜' 隊列。這最後一個命令中的額外執行參數不會執行任何操作。在Chain(或Group,或任何其他Canvas基元)級別應用所有這些執行參數將是一種很好的方式。
我不喜歡這樣寫道:
subtask = task.s(*myargs, **mykwargs).set(queue=myqueue)
mychain = celery.chain(subtask, subtask2, ...)
mychain.apply_async()
因此,如果在簽名中指定了「隊列」,但它不會在傳遞給「apply_async」時生效?你知道這個功能是否有一些很好的文檔? – dashesy 2015-04-10 19:52:08
同一鏈中的不同子任務可以分配不同的隊列嗎? – ForeverWintr 2016-08-05 18:44:43
這是相當晚了,但我不認爲通過@mpaf提供的代碼是完全正確的。
上下文:在我的情況,我有兩個子任務,在外面的第一提供被傳遞給第二個作爲輸入自變量的返回值。我在執行第二個任務時遇到了麻煩 - 我在日誌中看到Celery會承認第二個任務是第一個任務的回調,但是它不會執行第二個任務。
這是我的非工作鏈碼 - :
from celery import chain
chain(
module.task1.s(arg),
module.task2.s()
).apply_async(countdown=0.1, queue='queuename')
使用@ MPAF的回答提供的語法,我得到了兩個任務要執行,但執行順序是隨意和第二子任務未得到確認作爲第一個回調。我的想法是瀏覽關於如何在子任務上明確設置隊列的文檔。 「AsyncResult」:| -
這是工作代碼:不支持的操作類型:
chain(
module.task1.s(arg).set(queue='queuename'),
module.task2.s().set(queue='queuename')
).apply_async(countdown=0.1)
爲我工作兄弟 – phacic 2017-07-03 11:21:54
嗯,那部分例如沒有爲我工作,我回到了以下錯誤:類型錯誤和'AsyncResult'(使用3.0。23) – Clara 2014-04-10 10:49:05
我在嘗試讓'chain'執行第二項任務時遇到了問題。問題:如果你在兩個任務上調用'apply_async',那真的是一個鏈嗎?兩種任務都不會自動執行嗎? 我試過你的語法,因爲在我的情況下,第一個子任務返回第二個使用的值。 – PritishC 2016-07-30 13:17:26