有趣的問題,我認爲應該很容易解決使用廣播命令。 如果新員工啓動,它會請求所有其他員工將其撤銷的 任務轉儲給新員工。添加了兩個新的遠程控制命令, 您可以輕鬆地通過使用@Panel.register
添加新的命令,
模塊control.py:
from celery.worker import state
from celery.worker.control import Panel
@Panel.register
def bulk_revoke(panel, ids):
state.revoked.update(ids)
@Panel.register
def broadcast_revokes(panel, destination):
panel.app.control.broadcast("bulk_revoke", arguments={
"ids": list(state.revoked)},
destination=destination)
將它添加到CELERY_IMPORTS:
CELERY_IMPORTS = ("control",)
唯一缺少的問題現在是連接它,以便新員工在啓動時觸發broadcast_revokes
。我想你可以使用這個worker_ready
信號:
from celery import current_app as celery
from celery.signals import worker_ready
def request_revokes_at_startup(sender=None, **kwargs):
celery.control.broadcast("broadcast_revokes",
destination=sender.hostname)
這是不是仍然在理論上要求我保留在數據庫中所有以前的任務的結果,因爲任何修剪會導致在重新啓動新的工作進程擔保損失不要運行以前撤銷的任務? – 2012-04-08 13:26:09
我假設您已經設置了某種數據庫模型,您還使用它來存儲任務ID,以便在必要時撤銷該任務?如果是這樣,你可以給這個模型添加一個'completed'標誌。 – 2012-04-08 13:29:05
我只想出了一個選擇:保留一個撤銷任務ID的列表,並且每次celeryd進程啓動或重新啓動後,腳本都會遍歷整個列表並重新發送撤銷命令。這樣我們只需保留自上次腳本運行以來已被撤銷的任務ID。你能看到這個實現中的任何缺陷嗎? – 2012-04-08 13:30:13