2013-02-19 31 views
9

當我路線的任務,它的工作原理特定隊列:如何將任務鏈路由到芹菜中的特定隊列?

task.apply_async(queue='beetroot') 

但是,如果我創建了一個鏈條:

chain = task | task 

然後我寫

chain.apply_async(queue='beetroot') 

它似乎忽略隊列關鍵字並分配給默認的「芹菜」隊列。

這將是很好,如果芹菜支持鏈路由 - 在同一個隊列順序執行的所有任務。

回答

10

好吧我得到這個想通了。

您必須加入像隊列=或倒計時所需執行選項=給子任務的定義,或通過部分:

子任務定義:

from celery import subtask 

chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot') 

部分:

chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot') 

chain.apply_async() 

然後你通過執行鏈

,或者

chain.delay() 

這些任務將被髮送到 '甜菜' 隊列。這最後一個命令中的額外執行參數不會執行任何操作。在Chain(或Group,或任何其他Canvas基元)級別應用所有這些執行參數將是一種很好的方式。

+2

嗯,那部分例如沒有爲我工作,我回到了以下錯誤:類型錯誤和'AsyncResult'(使用3.0。23) – Clara 2014-04-10 10:49:05

+0

我在嘗試讓'chain'執行第二項任務時遇到了問題。問題:如果你在兩個任務上調用'apply_async',那真的是一個鏈嗎?兩種任務都不會自動執行嗎? 我試過你的語法,因爲在我的情況下,第一個子任務返回第二個使用的值。 – PritishC 2016-07-30 13:17:26

12

我不喜歡這樣寫道:

subtask = task.s(*myargs, **mykwargs).set(queue=myqueue) 
mychain = celery.chain(subtask, subtask2, ...) 
mychain.apply_async() 
+0

因此,如果在簽名中指定了「隊列」,但它不會在傳遞給「apply_async」時生效?你知道這個功能是否有一些很好的文檔? – dashesy 2015-04-10 19:52:08

+0

同一鏈中的不同子任務可以分配不同的隊列嗎? – ForeverWintr 2016-08-05 18:44:43

3

這是相當晚了,但我不認爲通過@mpaf提供的代碼是完全正確的。

上下文:在我的情況,我有兩個子任務,在外面的第一提供被傳遞給第二個作爲輸入自變量的返回值。我在執行第二個任務時遇到了麻煩 - 我在日誌中看到Celery會承認第二個任務是第一個任務的回調,但是它不會執行第二個任務。

這是我的非工作鏈碼 - :

from celery import chain 

chain(
    module.task1.s(arg), 
    module.task2.s() 
).apply_async(countdown=0.1, queue='queuename') 

使用@ MPAF的回答提供的語法,我得到了兩個任務要執行,但執行順序是隨意和第二子任務未得到確認作爲第一個回調。我的想法是瀏覽關於如何在子任務上明確設置隊列的文檔。 「AsyncResult」:| -

這是工作代碼:不支持的操作類型:

chain(
    module.task1.s(arg).set(queue='queuename'), 
    module.task2.s().set(queue='queuename') 
).apply_async(countdown=0.1) 
+0

爲我工作兄弟 – phacic 2017-07-03 11:21:54