2017-05-15 60 views
1

我正在使用Celery從Django應用程序處理異步任務。大多數任務都很短,幾秒鐘後即可運行,但我有一項任務需要幾個小時。如何按任務名稱限制芹菜任務?

由於我的服務器上處理的限制,芹菜被配置爲只運行2個任務一次。這意味着如果有人啓動了其中兩項長時間運行的任務,它可以在幾個小時內有效阻止所有其他Celery處理站點,這非常糟糕。

有什麼辦法來配置芹菜,因此只處理一種類型的任務不超過一次一個多嗎?喜歡的東西:

@task(max_running_instances=1) 
def my_really_long_task(): 
    for i in range(1000000000): 
     time.sleep(6000) 

注意,我不想取消的my_really_long_task所有其他發射。我只是不想讓他們馬上開始,並且只有在所有其他同名任務完成後纔開始。由於這似乎不被Celery支持,我目前的解決方法是查詢任務中的其他任務,如果我們發現其他正在運行的實例,則重新安排自己以後再運行,例如,

from celery.task.control import inspect 

def get_all_active_celery_task_names(ignore_id=None): 
    """ 
    Returns Celery task names for all running tasks. 
    """ 
    i = inspect() 
    task_names = defaultdict(int) # {name: count} 
    if i: 
     active = i.active() 
     if active is not None: 
      for worker_name, tasks in i.active().iteritems(): 
       for task in tasks: 
        if ignore_id and task['id'] == ignore_id: 
         continue 
        task_names[task['name']] += 1 
    return task_names 

@task 
def my_really_long_task(): 

    all_names = get_all_active_celery_task_names() 
    if 'my_really_long_task' in all_names: 
     my_really_long_task.retry(max_retries=100, countdown=random.randint(10, 300)) 
     return 

    for i in range(1000000000): 
     time.sleep(6000) 

有沒有更好的方法來做到這一點?

我知道其他hacky解決方案,如this,但設置一個單獨的memcache服務器來跟蹤任務唯一性比我上面使用的方法更不可靠,也更復雜。

+0

我結束了加入稱爲TaskLock一個Django表和所使用的「與tasklock(個體)」表達來檢查/調用/任務內解除鎖定。我也想看看少哈克的東西... – rrauenza

+0

請確保您還使用的時限在你的任務,如果你的鎖都具有過期... – rrauenza

回答

1

一種替代的解決方案是排隊成my_really_long_task一個單獨的隊列中。

my_really_long_task.apply_async(*args, queue='foo') 

然後啓動一個併發性爲1的工作器來使用這些任務,這樣一次只能執行一個任務。

celery -A foo worker -l info -Q foo 
+0

這似乎像乾淨的解決方案,雖然需要一點DEVOPS開銷。謝謝。 – Cerin