2014-01-28 26 views
1

我想創建一個在一組中的任務是和絃自己的和絃。所以想法是 - 每個單獨的子弦首先執行,然後這些和絃的結果最終被父任務的回調使用。這裏是我想要做如何在自己的任務是和絃的芹菜中創建一個和絃

代碼 - tasks.py

import celery 
from celery import chord 

@celery.task 
def add(x, y): 
    print "add called..." 
    return x + y 

@celery.task 
def tsum(numbers): 
    return sum(numbers) 

@celery.task 
def gr_add(x): 
    print "**** gr_add called" 
    c = chord(add.subtask((i,i)) for i in range(2*x, 2*x+2)) 
    r = c(tsum.subtask()) 
    return r.get(timeout=120) 

- 然後我test-chord-chord.py文件包含

from tasks import add, tsum, gr_add 
from celery import chord 

c = chord(gr_add.subtask((i,)) for i in range(2)) 
result = c(tsum.subtask()) 
print result.get(timeout=5) 

然而,這未按預期執行。我做得對嗎?有沒有另外一種方法來達到上述目的?

回答

0

我終於能夠弄清楚是什麼問題。雖然不太清楚如何解決它。罪魁禍首是呼叫上面的gr_add回調,特別是行 - r = c(tsum.subtask())。這會阻止 - 因此運行此任務的工作人員將被阻止。現在當我只有一名工人時 - 以上是一個問題。當我創建了多個工作人員(在這種情況下,特別是三個 - 因爲原來的和絃創建了兩個和絃),每個工作人員只拾取一個任務CELERY_PREFTCH_MULTIPLIER = 1CELERY_ACKS_LATE=True。三名工作人員在隊列中揀起了任務 - 第一個拿起第一個和絃,現在被封鎖了。第二個人拿起第二個和絃,現在被阻擋。第三個選出了第一個和絃的第一個任務 - 計算,退出 - 暢通無阻。拿起下一個任務 - 計算/退出 - 取消阻止。第一個現在解鎖。拾取第二個和絃的第一個任務 - 計算/退出 - 解鎖。其中一名工作人員1或3拿起計算出和絃的第二個子任務,計算出來,退出。現在所有三個都被阻止,結果按照預期計算。總的來說 - 這是一個糟糕的設計 - 我應該做的是'創建另一個子任務 - 獲得和絃的結果,如果它們還沒有成功,那麼在一段時間後重試。這些重試間隔之間的時間間隔將允許工作人員完成各個子任務,最後可以調用主調用和絃的回調。

編輯1:終於能夠弄清楚如何處理這個問題 - 在由外部和絃調用的任務中阻塞是一個糟糕的主意。相反,我所做的是 - 從被調用的任務 返回AsyncResult,然後創建一個輪詢任務,調查結果爲成功或失敗。然後計算此輪詢器任務中的結果。此任務在特定持續時間後重試。 (這幾乎就像unlock_chord任務 - 只是我自己在一個單獨的任務中由代碼控制 - 而不是讓芹菜處理它)

+0

介意分享您的代碼? – embedded

+0

抱歉沒有它方便! – gabhijit