2016-05-30 35 views
4

我有一些相當繁忙的芹菜隊列,但不知道哪些任務是有問題的。有沒有一種方法可以對結果進行彙總以確定哪些任務需要很長時間?我有2-4臺服務器上有10-20名工人。芹菜時間統計每任務名稱

使用redis作爲代理,並將結果作爲後端。我注意到Flower上的繁忙隊列,但無法弄清楚如何獲得按任務彙總的時間統計信息。

回答

1

方法1:

如果已經啓用了日誌記錄時,芹菜工人開始,他們記錄所採取的每項任務的時間。

$ celery worker -l info -A your_app --logfile celery.log 

這會產生這樣的

[2016-06-04 13:21:30,749: INFO/MainProcess] Task sig.add[a8b648eb-9674-44f0-90bd-71cfebe22f2f] succeeded in 0.00979363399983s: 3 
[2016-06-04 13:21:30,973: INFO/MainProcess] Received task: sig.add[7fd422e6-8f48-4dd2-90de-e213afbedc38] 
[2016-06-04 13:21:30,982: WARNING/Worker-2] called by small_task. LOL {'signal': <Signal: Signal>, 'result': 3, 'sender': <@task: sig.add of tasks:0x7fdf33146c50>} 

日誌可以篩選具有succeeded in線。使用[,:作爲分隔符分割這些行,打印任務的名稱和時間,然後對所有行進行排序。

$ grep ' succeeded in ' celery.log | awk -F'[ :\[]' '{print $9, $13}' | sort 
awk: warning: escape sequence `\[' treated as plain `[' 
sig.add 0.00775764500031s 
sig.add 0.00802627899975s 
sig.foo 12.00813863099938s 
sig.foo 15.00871706100043s 
sig.foo 12.00979363399983s 

正如你可以看到add是非常快的& foo是緩慢的。

方法2:

芹菜具有任務之後之前運行/ task_prerun_handlertask_postrun_handler信號。你可以連接函數來追蹤時間,然後記下某處的時間。

from time import time 
from celery.signals import task_prerun, task_postrun 


tasks = {} 
task_avg_time = {} 
Average = namedtuple('Average', 'cum_avg count') 


@task_prerun.connect 
def task_prerun_handler(signal, sender, task_id, task, args, kwargs): 
    tasks[task_id] = time() 


@task_postrun.connect 
def task_postrun_handler(signal, sender, task_id, task, args, kwargs, retval, state): 
    try: 
     cost = time() - tasks.pop(task_id) 
    except KeyError: 
     cost = None 

    if not cost: 
     return 

    try: 
     cum_avg, count = task_avg_time[task.name] 
     new_count = count + 1 
     new_avg = ((cum_avg * count) + cost)/new_count 
     task_avg_time[task.name] = Average(new_avg, new_count) 
    except KeyError: 
     task_avg_time[task.name] = Average(cost, 1) 

    # write to redis: task_avg_time 

參考文獻:https://stackoverflow.com/a/31731622/2698552