2017-02-07 15 views
6

最近,我正在做一個關於GIT項目的實驗來理解大數據處理框架。Celery&Rabbitmq:WARNING/MainProcess]收到並刪除未知消息。錯誤的目的地?!? - 關於GIT的實驗

1,Git項目:https://github.com/esperdyne/celery-message-processing

,我們有以下組件:

1,AMPQ經紀人(RabbitMQ的):它可以作爲一個消息緩衝區,它可以作爲一個郵件箱交換消息爲不同的用戶!

2,worker:作爲服務的服務器,爲各種服務客戶端提供服務。 3,隊列(「芹菜」:它可以作爲其用於同時處理各種工人實例多處理容器

關鍵的配置可以看作是波紋管:

我們使用對象的凸出/ celery.py來定義應用程序,該定義可以如下圖所示:

app = Celery('proj', 
     broker='amqp://', 
     backend='redis://localhost', 
     include=['proj.tasks']) 

在這裏輸入的代碼

,當我們啓動應用程序:

1,當我們啓動應用程序時,我們看到了從rabbitmq生成的消息,但芹菜無法處理消息。

Parse.log看起來像這樣:[2017-02-04 14:28:06,909:WARNING/MainProcess]接收並刪除未知消息。錯誤的目的地?!?

,我們有以下問題:

4.2.1 AMQP機制 enter image description here 我們可以看到,AMQP工作作爲消息緩衝區,然後會有一個消息發送者和消息提取器:

在上圖中,誰是消息發送者,誰是消息提取者。

4.2.2消息定義 在我們的應用程序中,我們找不到代碼來定義要發送的消息或從AMQP接收消息。

4.2.3消息監視器 我們如何監視AMQP中的消息發送和接收。 希望老師能指導我們解決問題,並給我們一些細節

芹菜經紀人mechenism介紹!

注:錯誤日誌可以在這裏

[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? 



The full contents of the message body was: body: [[u'maildir/allen-  p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'3cafda16-3e7c-44db-b05e-1327ef97ffc3'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'1f4c728b-680d-4dde-98b9-b153d5282780'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f21c911e-f2ac-462e-9662-2efbd27bcf91', u'root_id': None}}]}] (801b) 
{content_type:'application/json' content_encoding:'utf-8' 
    delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 623422L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', 'N\xfd\x17=\x00\x00': '[email protected]', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}} 


[2017-02-04 15:47:22,463: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672// 
[2017-02-04 15:47:22,473: INFO/MainProcess] mingle: searching for neighbors 
[2017-02-04 15:47:23,503: INFO/MainProcess] mingle: sync with 2 nodes 
[2017-02-04 15:47:23,504: INFO/MainProcess] mingle: sync complete 
[2017-02-04 15:47:23,530: INFO/MainProcess] [email protected] ready. 
[2017-02-04 15:47:24,890: INFO/MainProcess] sync with [email protected] 
[2017-02-04 15:47:51,017: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!? 

The full contents of the message body was: body: [[u'maildir/allen-p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'765e5bbe-198f-405c-b10c-023d35e03981'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'7dacb897-d023-40b5-9874-e00b75107bbd'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f0d41289-33e2-4c8c-8d84-9d1d4c5a9c80', u'root_id': None}}]}] (801b) 
{content_type:'application/json' content_encoding:'utf-8' 
    delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 3L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', 'N\xfd\x17=\x00\x00': '[email protected]', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}} 

enter code here 

回答

10

看到這將是有益的給芹菜的版本和librabbitmq您正在使用。由於我有一個非常類似的問題,我猜你正在使用celery 4.0.2和librabbitmq 1.6.1。

在這種情況下,這是一個已知的兼容性問題,您可以參考https://github.com/celery/celery/issues/3675https://github.com/celery/librabbitmq/issues/93

第一個鏈接給你的建議來解決,即你的問題:

  • 卸載librabbitmq pip uninstall librabbitmq (您可能需要調用此命令多次)

  • 變化amqp的出現給pyamqp在你的borker網址。 (雖然不是在你的配置文件中,如果你正在使用它,這樣做對我來說不起作用)。

要更準確地回答您的其他問題:你說得對,有一個發件人和一個收件人。

當您撥打Celery(...)時,應用程序會創建發件人角色。它的作用之一是充當任務註冊表,如果您在app/base.py中查看它的實現,您會看到它實現了一個方法send_task,該方法直接由Task類的方法apply_async調用。此方法的作用是通過線路將代理的編組版本發送至代理,以便工作人員獲取。用於傳輸消息的應用協議是amqp,其實現是librabbitmq。

在電線的另一側,還有另一個由工作人員發起的取樣工作。用芹菜的說法,它被稱爲Consumer。你可以在worker/consumer/consumer.py中找到它的實現。你會看到它實現了一個create_task_handler,它依次定義了引起錯誤的on_task_received函數。這是在從工作人員獲取新任務並在下一行處理時調用的函數。

溶液因此建議在於改變AMQP協議的實現,使得TypeError未在on_task_received升高(這在我看來將通過編碼問題引起的)。

我希望它能回答您的所有問題,並讓您更清楚地瞭解芹菜的工作原理。我最終應該說,據我所知,「常規」使用Celery永遠不會要求你篡改這些內部類型,並且例如通過實現自定義任務類和自定義後端可以達到99%的可能需要。

+0

嗨阿尼斯:這是真的那麼NIC你在這個問題上幫助我!我必須打電話給你太棒了! 1)pip2。7安裝librabbitmq-1.6.1.tar.gz 2)pip2.7安裝celery-4.0.2.tar.gz。這正是我安裝的軟件版本!我遵循了您的建議!現在我的項目現在可以工作了!今晚我很開心!那個好朋友Anis幫我解決了這個問題! – arthur

0

只是爲了讓答案也位於此處。在線程阿尼斯是指23doors mentions芹菜4的新默認協議沒有發揮好與librabbitmq

顯然librabbitmq問題是關係到4.x的芹菜新的默認協議

他還提到,要解決這個問題,你可以通過設置使使用老式協議芹菜優惠(如果你使用Django):

CELERY_TASK_PROTOCOL = 1 

否則,您可以設置以下的celeryconf.py文件

app.conf.task_protocol = 1 

所有信貸23doors :)

相關問題