2013-02-14 28 views
1

好像郵件沒有正確放入隊列中。Django /芹菜/ Kombu工作人員錯誤:收到並刪除了未知消息。錯誤的目的地?

我使用Django與芹菜和Kombu使用Django自己的數據庫作爲代理後端。我需要的只是一個非常簡單的Pub/Sub設置。它最終會部署到Heroku,所以我使用工頭在本地運行。下面是相關的代碼和信息:

PIP凍結

Django==1.4.2 
celery==3.0.15 
django-celery==3.0.11 
kombu==2.5.6 

Procfile

web: source bin/activate; python manage.py run_gunicorn -b 0.0.0.0:$PORT -w 4; python manage.py syncdb 
celeryd: python manage.py celeryd -E -B --loglevel=INFO 

settings.py

# Celery configuration 
import djcelery 
CELERY_IMPORTS = ("api.tasks",) 
BROKER_URL = "django://localhost//" 
djcelery.setup_loader() 

put_message

with Connection(settings.BROKER_URL) as conn: 
    queue = conn.SimpleQueue('celery') 
    queue.put(id) 
    queue.close() 

API/tasks.py

@task() 
def process_next_task(): 
    with Connection(settings.BROKER_URL) as conn: 
    queue = conn.SimpleQueue('celery') 
    message = queue.get(block=True, timeout=1) 
    id = int(message.payload) 
    try: 
     Model.objects.get(id=id) 
    except Model.DoesNotExist: 
     message.reject() 
    else: 
     # Do stuff here 
     message.ack() 
    queue.close() 

在終端,foreman start作品就好了,並顯示此:

started with pid 31835 
17:08:22 celeryd.1 | started with pid 31836 
17:08:22 web.1  | /usr/local/foreman/bin/foreman-runner: line 41: exec: source: not found 
17:08:22 web.1  | 2013-02-14 17:08:22 [31838] [INFO] Starting gunicorn 0.16.1 
17:08:22 web.1  | 2013-02-14 17:08:22 [31838] [INFO] Listening at: http://0.0.0.0:5000 (31838) 
17:08:22 web.1  | 2013-02-14 17:08:22 [31838] [INFO] Using worker: sync 
17:08:22 web.1  | 2013-02-14 17:08:22 [31843] [INFO] Booting worker with pid: 31843 
17:08:22 web.1  | 2013-02-14 17:08:22 [31844] [INFO] Booting worker with pid: 31844 
17:08:22 web.1  | 2013-02-14 17:08:22 [31845] [INFO] Booting worker with pid: 31845 
17:08:22 web.1  | 2013-02-14 17:08:22 [31846] [INFO] Booting worker with pid: 31846 
17:08:22 celeryd.1 | [2013-02-14 17:08:22,858: INFO/Beat] Celerybeat: Starting... 
17:08:22 celeryd.1 | [2013-02-14 17:08:22,870: WARNING/MainProcess] [email protected] ready. 
17:08:22 celeryd.1 | [2013-02-14 17:08:22,873: INFO/MainProcess] consumer: Connected to django://localhost//. 
17:08:42 celeryd.1 | [2013-02-14 17:08:42,926: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? 
17:08:42 celeryd.1 | The full contents of the message body was: body: 25 (2b) {content_type:u'application/json' content_encoding:u'utf-8' delivery_info:{u'priority': 0, u'routing_key': u'celery', u'exchange': u'celery'}} 

最後兩行不立即顯示,但在我的API收到POST請求時顯示運行上面的put_message部分中的代碼。我已經嘗試過使用Kombu完全吹出的Producer和Consumer類,並得到了相同的結果。

海帶的SimpleQueue例如:http://kombu.readthedocs.org/en/latest/userguide/examples.html#hello-world-example
芹菜文檔:http://docs.celeryproject.org/en/latest/index.html

任何想法?

EDITED

的procfile內更改爲--loglevel=DEBUG終端輸出改變爲以下:

08:54:33 celeryd.1 | started with pid 555 
08:54:33 web.1  | started with pid 554 
08:54:33 web.1  | /usr/local/foreman/bin/foreman-runner: line 41: exec: source: not found 
08:54:36 web.1  | 2013-02-15 08:54:36 [557] [INFO] Starting gunicorn 0.16.1 
08:54:36 web.1  | 2013-02-15 08:54:36 [557] [INFO] Listening at: http://0.0.0.0:5000 (557) 
08:54:36 web.1  | 2013-02-15 08:54:36 [557] [INFO] Using worker: sync 
08:54:36 web.1  | 2013-02-15 08:54:36 [564] [INFO] Booting worker with pid: 564 
08:54:36 web.1  | 2013-02-15 08:54:36 [565] [INFO] Booting worker with pid: 565 
08:54:36 web.1  | 2013-02-15 08:54:36 [566] [INFO] Booting worker with pid: 566 
08:54:36 web.1  | 2013-02-15 08:54:36 [567] [INFO] Booting worker with pid: 567 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,480: DEBUG/MainProcess] [Worker] Loading modules. 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,484: DEBUG/MainProcess] [Worker] Claiming components. 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,484: DEBUG/MainProcess] [Worker] Building boot step graph. 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,484: DEBUG/MainProcess] [Worker] New boot order: {ev, queues, beat, pool, mediator, autoreloader, timers, state-db, autoscaler, consumer} 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,489: DEBUG/MainProcess] Starting celery.beat._Process... 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,490: DEBUG/MainProcess] celery.beat._Process OK! 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,491: DEBUG/MainProcess] Starting celery.concurrency.processes.TaskPool... 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,491: INFO/Beat] Celerybeat: Starting... 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,506: DEBUG/MainProcess] celery.concurrency.processes.TaskPool OK! 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,507: DEBUG/MainProcess] Starting celery.worker.mediator.Mediator... 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,507: DEBUG/MainProcess] celery.worker.mediator.Mediator OK! 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,507: DEBUG/MainProcess] Starting celery.worker.consumer.BlockingConsumer... 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,508: WARNING/MainProcess] [email protected] ready. 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,508: DEBUG/MainProcess] consumer: Re-establishing connection to the broker... 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,510: INFO/MainProcess] consumer: Connected to django://localhost//. 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,628: DEBUG/Beat] Current schedule: 
08:54:37 celeryd.1 | <Entry: celery.backend_cleanup celery.backend_cleanup() {<crontab: * 4 * * * (m/h/d/dM/MY)>} 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,629: DEBUG/Beat] Celerybeat: Ticking with max interval->5.00 minutes 
08:54:37 celeryd.1 | [2013-02-15 08:54:37,658: DEBUG/Beat] Celerybeat: Waking up in 5.00 minutes. 
08:54:38 celeryd.1 | [2013-02-15 08:54:38,110: DEBUG/MainProcess] consumer: basic.qos: prefetch_count->16 
08:54:38 celeryd.1 | [2013-02-15 08:54:38,126: DEBUG/MainProcess] consumer: Ready to accept tasks! 
08:55:08 celeryd.1 | [2013-02-15 08:55:08,184: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? 
08:55:08 celeryd.1 | The full contents of the message body was: body: 26 (2b) {content_type:u'application/json' content_encoding:u'utf-8' delivery_info:{u'priority': 0, u'routing_key': u'celery', u'exchange': u'celery'}} 
+0

嘗試改變--loglevel =運行芹菜到--logl時的信息evel = DEBUG,看看它是否提供了更有用的消息。 – njbooher 2013-02-14 22:45:42

+0

當它說「消息體的全部內容是:body:25(2b)'時,這意味着body是2個字節大小,由字符」2「和」5「組成,這當然不會看起來就像一個有效的任務消息! – asksol 2013-02-15 12:55:31

+0

@njbooher我剛剛添加了DEBUG信息,但它似乎沒有幫助。 – Vail 2013-02-15 13:59:56

回答

3

問題是雙重的:

消息格式是錯誤的。根據@asksol提供的http://docs.celeryproject.org/en/latest/internals/protocol.html文檔,它需要是一本字典,並且遵循該頁底部的示例。

實例消息

{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77", 
"task": "celery.task.PingTask", 
"args": [], 
"kwargs": {}, 
"retries": 0, 
"eta": "2009-11-17T12:30:56.527191"} 

put_message

with Connection(settings.BROKER_URL) as conn: 
    queue = conn.SimpleQueue('celery') 
    message = { 
    'task': 'process-next-task', 
    'id': str(uuid.uuid4()), 
    'args': [id], 
    "kwargs": {}, 
    "retries": 0, 
    "eta": str(datetime.datetime.now()) 
    } 
    queue.put(message) 
    queue.close() 

的Procfile進程是運行任務的一個消費者,所以沒有必要設立一個消費者的任務範圍內。我只需要使用發佈消息時發送的參數。

API/tasks.py

@task(serializer='json', name='process-next-task') 
def process_next_task(id): 
    try: 
    Model.objects.get(id=int(id)) 
    except Model.DoesNotExist: 
    pass 
    else: 
    # Do stuff here 
3

這不是對於這個問題的解決方案,
但是大關使用celery4.0.2

輸出一樣的問題:

[2017-02-09 17:45:12,136: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? 

The full contents of the message body was: body: [[], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': None}] (77b) 
{content_type:'application/json' content_encoding:'utf-8' 
    delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'test2', 'delivery_tag': 1L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': '3f6295b3-c85c-4188-b424-d186da7e2edb', 'N\xfd\x17=\x00\x00': '[email protected]', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'celeryserver.tasks.test', '\xae\xbf': '3f6295b3-c85c-4188-b424-d186da7e2edb', '\x11s\x1f\xd8\x00\x00\x00\x00': '()', 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}} 

解決方案: https://github.com/celery/celery/issues/3675

# call this command many times, until it says it's not installed 
pip uninstall librabbitmq 

感謝https://github.com/ask

+1

如果它不是解決方案,爲什麼你在這裏發佈? – 2017-02-09 10:19:04

+0

謝謝。你爲我節省了很多時間。 – acidjunk 2017-02-17 11:46:17

0

顯然librabbitmq問題在芹菜4.x的有關新的默認協議你可以,如果你在celeryconf.py

使用Django或設置app.conf.task_protocol = 1然後你就可以從另一個任務隊列任務設置由要麼把CELERY_TASK_PROTOCOL = 1切換到以前的協議版本。

0

我:[芹菜3.1.25;的django = 1.11]

i。從settings.py

CELERY_QUEUES = { 
    # "implicit":  {"exchange": "implicit", "routing_key": "implicit"}, 
} 

# I declare queue 
ch = settings.CELERY_APP.connection().channel() 
ex = Exchange("implicit", channel=ch) 
q = Queue(name="implicit", routing_key="implicit", channel=ch, exchange=ex) 
q.declare() # <-- here 
producer = ch.Producer(routing_key=q.routing_key, exchange=q.exchange) 
# publish 
producer.publish("text") 

刪除或U可以使用第二個版本
從海帶文檔

from kombu import Exchange, Queue 
task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks') 

producer.publish(
    {'hello': 'world'}, 
    retry=True, 
    exchange=task_queue.exchange, 
    routing_key=task_queue.routing_key, 
    declare=[task_queue], # <-- declares exchange, queue and binds. 
) 
相關問題