2011-12-14 42 views
33

我正在使用Celery來管理異步任務。然而,有時候,芹菜過程會停止,導致沒有任何任務被執行。我希望能夠檢查芹菜的狀態,並確保一切工作正常,如果我檢測到任何問題,都會向用戶顯示錯誤消息。從Celery Worker的文檔看來,我似乎可以使用pinginspect來做這件事,但是ping感覺很難受,並且不清楚如何使用inspect(如果inspect().registered()是空的)。檢測芹菜是否可用/正在運行

任何指導,將不勝感激。基本上我尋找的是類似的方法,以便:

def celery_is_alive(): 
    from celery.task.control import inspect 
    return bool(inspect().registered()) # is this right?? 

編輯:它甚至沒有像註冊()可以用芹菜2.3.3(即使2.1的文檔列表中的話)。也許ping是正確的答案。

編輯:平還沒有出現做我認爲會做什麼,所以仍然不知道答案在這裏。

+0

下面沒有沒有答案的爲你工作?作爲一個有類似問題的人解決,我很想要一些確認。 – kojiro 2012-06-13 22:19:37

回答

44

這是我一直在使用的代碼。 celery.task.control.Inspect.stats()返回一個字典,其中包含有關當前可用工作人員的大量詳細信息,如果沒有工作人員正在運行,則返回None;如果無法連接到消息代理,則返回IOError。我正在使用RabbitMQ - 其他消息傳遞系統的行爲可能略有不同。這在Celery 2.3.x和2.4.x中有效;我不確定它有多遠。

def get_celery_worker_status(): 
    ERROR_KEY = "ERROR" 
    try: 
     from celery.task.control import inspect 
     insp = inspect() 
     d = insp.stats() 
     if not d: 
      d = { ERROR_KEY: 'No running Celery workers were found.' } 
    except IOError as e: 
     from errno import errorcode 
     msg = "Error connecting to the backend: " + str(e) 
     if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED': 
      msg += ' Check that the RabbitMQ server is running.' 
     d = { ERROR_KEY: msg } 
    except ImportError as e: 
     d = { ERROR_KEY: str(e)} 
    return d 
+0

爲我工作:) – kojiro 2012-06-15 03:57:45

+6

我發現上面每次運行時都會將兩個reply.celery.pidbox隊列添加到rabbitmq。這會導致rabbitmq內存使用量的增加。 – kojiro 2012-07-09 15:20:47

2

以下爲我工作:

import socket 
from kombu import Connection 

celery_broker_url = "amqp://localhost" 

try: 
    conn = Connection(celery_broker_url) 
    conn.ensure_connection(max_retries=3) 
except socket.error: 
    raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url))