我正在使用Celery從Django應用程序處理異步任務。大多數任務都很短,幾秒鐘後即可運行,但我有一項任務需要幾個小時。如何按任務名稱限制芹菜任務?
由於我的服務器上處理的限制,芹菜被配置爲只運行2個任務一次。這意味着如果有人啓動了其中兩項長時間運行的任務,它可以在幾個小時內有效阻止所有其他Celery處理站點,這非常糟糕。
有什麼辦法來配置芹菜,因此只處理一種類型的任務不超過一次一個多嗎?喜歡的東西:
@task(max_running_instances=1)
def my_really_long_task():
for i in range(1000000000):
time.sleep(6000)
注意,我不想取消的my_really_long_task
所有其他發射。我只是不想讓他們馬上開始,並且只有在所有其他同名任務完成後纔開始。由於這似乎不被Celery支持,我目前的解決方法是查詢任務中的其他任務,如果我們發現其他正在運行的實例,則重新安排自己以後再運行,例如,
from celery.task.control import inspect
def get_all_active_celery_task_names(ignore_id=None):
"""
Returns Celery task names for all running tasks.
"""
i = inspect()
task_names = defaultdict(int) # {name: count}
if i:
active = i.active()
if active is not None:
for worker_name, tasks in i.active().iteritems():
for task in tasks:
if ignore_id and task['id'] == ignore_id:
continue
task_names[task['name']] += 1
return task_names
@task
def my_really_long_task():
all_names = get_all_active_celery_task_names()
if 'my_really_long_task' in all_names:
my_really_long_task.retry(max_retries=100, countdown=random.randint(10, 300))
return
for i in range(1000000000):
time.sleep(6000)
有沒有更好的方法來做到這一點?
我知道其他hacky解決方案,如this,但設置一個單獨的memcache服務器來跟蹤任務唯一性比我上面使用的方法更不可靠,也更復雜。
我結束了加入稱爲TaskLock一個Django表和所使用的「與tasklock(個體)」表達來檢查/調用/任務內解除鎖定。我也想看看少哈克的東西... – rrauenza
請確保您還使用的時限在你的任務,如果你的鎖都具有過期... – rrauenza