1
如何從一個自定義manage.py命令將消息發送到一個Django消費者如何從一個命令將消息發送到一個頻道
from django.core.management.base import BaseCommand, CommandError
from channels import Channel
class Command(BaseCommand):
help = 'Sends a message to a Django channel from the thing'
def add_arguments(self, parser):
parser.add_argument('json_object', nargs='+', type=str)
def handle(self, *args, **options):
self.stdout.write("TEST !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
print Channel("test").channel_layer
Channel("test").send({'op':options['json_object'][0]})
這是我的消費者
class MyConsumer(WebsocketConsumer):
@classmethod
def channel_names(self):
return {"test"}
def connection_groups(self):
return ["test"]
def dispatch(self, message, **kwargs):
return self.get_handler(message, **kwargs)(message, **kwargs)
def get_handler(self, message, **kwargs):
channel_routing = [
consumers.MyConsumer.as_route(path=r"^/test/"),
route("test.receive", consumers.chat_join),
] for _filter, value in kwargs.items():
filter_mapping = getattr(self, _filter + '_mapping', None)
if not filter_mapping:
continue
consumer = getattr(self, filter_mapping.get(value), None)
if consumer:
return consumer
raise ValueError('Message')
def connect(self,message):
self.message.reply_channel.send({"accept": True})
def receive(self,text=None, bytes= None):
print text
def disconnect(self,message):
pass
當我嘗試但是運行命令,我得到這個消息
2017年3月8日03:45:33839 - 錯誤 - 工人 - 無法找到匹配的消息上測試!檢查你的路由。
如果它是貼切的,這裏是我的路由
channel_routing = [
consumers.MyConsumer.as_route(path=r"^/test/"),
]
我有一個非常類似的問題的原因,你有沒有設法解決這個問題? – kunambi
我實際上創建了RedisChannelLayer的自定義版本並覆蓋了這些方法 – cjds