2017-08-24 32 views
0

我對django頻道和消息隊列有點新。當使用rabbitmq和django頻道msgpack錯誤

我的要求如下:

網頁作出WebSocket連接到Django的服務器 Django的服務器需要訂閱頻道(基於用戶名)一個RabbitMQ的服務器 當消息訂閱的頻道到達時,路線它到相應的用戶網絡插座,然後將網頁更新UI 我有一個基本的WebSocket示例應用程序的工作按http://channels.readthedocs.io/en/stable/

現在我想要來處理來自通道的RabbitMQ消息

我有以下路徑:

routes = [ 
    route("websocket.receive", ws_message), 
    route("websocket.connect", ws_accept), 
    route("hello", hello_message), 
] 

及以下的消費者:

import sys 
import logging 

logger = logging.getLogger('test') 

def ws_message(message): 
    logger.debug('---------- Got message on web socket --------------------') 
    message.reply_channel.send({"text": message.content['text']}) 


def ws_accept(message): 
    logger.debug('--------- Accepted Web Socket connection ----------------') 
    message.reply_channel.send({"accept": True}) 


def hello_message(): 
    logger.debug('---------- Got message on MQ --------------------') 

我寫了一個小的外部腳本來發送郵件到 「你好」 頻道:

#!/usr/bin/env python 

import pika 
import sys 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='hello', arguments={'x-expires': 120000, 'x-dead-letter-exchange': 'dead-letters'}) 

print 'Sending: ' + sys.argv[1]; 

channel.basic_publish(exchange='', routing_key='hello', body=sys.argv[1]) 
connection.close() 

當我運行這個腳本併發送消息時,在django runserver輸出上出現以下錯誤:

python2 manage.py runserver 
Performing system checks... 

System check identified some issues: 

WARNINGS: 
?: (1_8.W001) The standalone TEMPLATE_* settings were deprecated in Django 1.8 and the TEMPLATES dictionary takes precedence. You must put the values of the following settings into your default TEMPLATES dict: TEMPLATE_DIRS, TEMPLATE_LOADERS. 

System check identified 1 issue (0 silenced). 
August 24, 2017 - 10:06:04 
Django version 1.11.4, using settings 'jarvice_channels.settings' 
Starting Channels development server at http://127.0.0.1:8000/ 
Channel layer default (asgi_rabbitmq.core.RabbitmqChannelLayer) 
Quit the server with CONTROL-C. 
2017-08-24 10:06:04,788 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive 
2017-08-24 10:06:04,789 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive 
2017-08-24 10:06:04,790 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive 
2017-08-24 10:06:04,790 - INFO - worker - Listening on channels hello, http.request, websocket.connect, websocket.disconnect, websocket.receive 
2017-08-24 10:06:04,792 - INFO - server - HTTP/2 support not enabled (install the http2 and tls Twisted extras) 
2017-08-24 10:06:04,792 - INFO - server - Using busy-loop synchronous mode on channel layer 
2017-08-24 10:06:04,792 - INFO - server - Listening on endpoint tcp:port=8000:interface=127.0.0.1 
Exception in thread Thread-4: 
Traceback (most recent call last): 
    File "/usr/lib64/python2.7/threading.py", line 801, in __bootstrap_inner 
    self.run() 
    File "/home/rep/rmqtest_env/lib/python2.7/site-packages/channels/management/commands/runserver.py", line 175, in run 
    worker.run() 
    File "/home/rep/rmqtest_env/lib/python2.7/site-packages/channels/worker.py", line 87, in run 
    channel, content = self.channel_layer.receive_many(channels, block=True) 
    File "/home/rep/rmqtest_env/lib/python2.7/site-packages/asgiref/base_layer.py", line 43, in receive_many 
    return self.receive(channels, block) 
    File "/home/rep/rmqtest_env/lib/python2.7/site-packages/asgi_rabbitmq/core.py", line 822, in receive 
    return future.result() 
    File "/home/rep/rmqtest_env/lib/python2.7/site-packages/concurrent/futures/_base.py", line 429, in result 
    return self.__get_result() 
    File "/home/rep/rmqtest_env/lib/python2.7/site-packages/concurrent/futures/_base.py", line 381, in __get_result 
    raise exception_type, self._exception, self._traceback 
ExtraData: unpack(b) received extra data. 

所以消息正在通過,但反序列化在某種程度上失敗了...... 什麼給了?

django渠道的東西是否期望一種特定的消息格式?

回答

2

的問題是,需要在被髮送的消息使用msgpack(和反射鏡中的django通道消息的結構)進行包裝,這樣的:

import msgpack 
import pika 

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
channel = connection.channel() 

channel.basic_publish(exchange='chat', 
        routing_key='external_or_whatever_you_desire', 
        body=msgpack.packb({'text': "Hello World!"}) 
       ) 
channel.close() 

自然地,交換應該匹配的一個在RabbitMQ上進行交流(如果您使用上面的聊天示例應該可以)。