2012-04-08 76 views
1

我有一個提醒類型的應用程序,它使用「eta」參數在芹菜中安排任務。如果提醒對象中的參數改變(例如提醒時間),則我撤銷先前發送的任務並排隊新的任務。如何在多個芹菜進程中跟蹤撤銷的任務

我想知道在芹菜重啓過程中是否有任何好方法跟蹤撤銷的任務。我希望能夠迅速地向上/向下縮放celeryd進程,並且看起來在revoke命令發送後啓動的任何celeryd進程仍將執行該任務。

這樣做的一種方法是保留已撤銷任務ID的列表,但此方法將導致列表隨意增大。修剪此列表需要保證該任務不再處於RabbitMQ隊列中,這似乎不可能。

我也試過對每個芹菜工人使用共享--statedb文件,但似乎只是更新了terminateb上的工作,因此不適合我想要完成的語句。

在此先感謝!

回答

0

我不得不在我的項目中做類似的事情,並使用celerycamdjango-admin-monitor。監視器會對任務進行快照並定期將其保存到數據庫中。並且有一個很好的用戶界面來瀏覽和檢查所有任務的狀態。你甚至可以使用它even if your project is not Django based

0

我前段時間實現了類似於此的解決方案,而我提出的解決方案與您的解決方案非常相似。

我解決這個問題的方法是讓作業在作業運行時(通過傳遞主鍵,如文檔建議)從數據庫中獲取Task對象。就你而言,在發送提醒之前,工作人員應該執行檢查以確保任務已準備好運行。如果沒有,它應該簡單地返回而不做任何工作(假設ETA已經改變,另一名工人將接受新工作)。

+0

這是不是仍然在理論上要求我保留在數據庫中所有以前的任務的結果,因爲任何修剪會導致在重新啓動新的工作進程擔保損失不要運行以前撤銷的任務? – 2012-04-08 13:26:09

+0

我假設您已經設置了某種數據庫模型,您還使用它來存儲任務ID,以便在必要時撤銷該任務?如果是這樣,你可以給這個模型添加一個'completed'標誌。 – 2012-04-08 13:29:05

+0

我只想出了一個選擇:保留一個撤銷任務ID的列表,並且每次celeryd進程啓動或重新啓動後,腳本都會遍歷整個列表並重新發送撤銷命令。這樣我們只需保留自上次腳本運行以來已被撤銷的任務ID。你能看到這個實現中的任何缺陷嗎? – 2012-04-08 13:30:13

1

有趣的問題,我認爲應該很容易解決使用廣播命令。 如果新員工啓動,它會請求所有其他員工將其撤銷的 任務轉儲給新員工。添加了兩個新的遠程控制命令, 您可以輕鬆地通過使用@Panel.register添加新的命令,

模塊control.py:

from celery.worker import state 
from celery.worker.control import Panel 

@Panel.register 
def bulk_revoke(panel, ids): 
    state.revoked.update(ids) 

@Panel.register 
def broadcast_revokes(panel, destination): 
    panel.app.control.broadcast("bulk_revoke", arguments={ 
     "ids": list(state.revoked)}, 
     destination=destination) 

將它添加到CELERY_IMPORTS:

CELERY_IMPORTS = ("control",) 

唯一缺少的問題現在是連接它,以便新員工在啓動時觸發broadcast_revokes。我想你可以使用這個worker_ready 信號:

from celery import current_app as celery 
from celery.signals import worker_ready 

def request_revokes_at_startup(sender=None, **kwargs): 
    celery.control.broadcast("broadcast_revokes", 
          destination=sender.hostname)