2013-03-22 89 views
3

我用celery.chord(...)來創建一組任務和該組中的所有任務完成後調用的方法。打破死循環芹菜

我使用AMQP結果後端(但我想切換到memcached的)。

我的工人版畫這一行了一遍又一遍,每一秒。我不知道如何打破這個無限循環。我可以訪問rabbitMQ網頁界面,但是我找不到ID爲「32ba5fe4 -...」的東西。

[2013-03-22 14:18:26,896: INFO/MainProcess] Task celery.chord_unlock[32ba5fe4-918c-480f-8a78-a310c11d0c3a] retry: Retry in 1s 
[2013-03-22 14:18:26,897: INFO/MainProcess] Got task from broker: celery.chord_unlock[32ba5fe4-918c-480f-8a78-a310c11d0c3a] eta:[2013-03-22 13:18:27.895123+00:00] 

這是一個測試環境。沒有數據可能會丟失。

我用芹菜3.0.16

回答

4

它不應該是一個無限循環。

如果和絃子任務已經完成了調用該合併回調任務的celery.chord_unlock任務檢查。如果不是,它會安排在第二秒再次檢查。一旦你的和絃任務完成,你將不再在日誌中看到這些消息。

編輯:您可以撤銷chord_unlock任務停止循環

celery.control.revoke('32ba5fe4-918c-480f-8a78-a310c11d0c3a') 
1

我有同樣的問題。要停止循環,我安裝flower,然後從任務在Web界面菜單撤銷任務。 撤銷按鈕位於單擊任務的UUID後出現的任務詳細信息頁面中。

1

對於一個全面的檢查,我在工人啓動通過信號設置max_retries

from celery.signals import worker_init 
@worker_init.connect 
def limit_chord_unlock_tasks(worker, **kwargs): 
    """ 
    Set max_retries for chord.unlock tasks to avoid infinitely looping 
    tasks. (see celery/celery#1700 or celery/celery#2725) 
    """ 
    task = worker.app.tasks['celery.chord_unlock'] 
    if task.max_retries is None: 
     retries = getattr(worker.app.conf, 'CHORD_UNLOCK_MAX_RETRIES', None) 
     task.max_retries = retries 

然後一個CHORD_UNLOCK_MAX_RETRIES變量添加到我的芹菜配置。