我有一個應用程序,其中每個websocket連接(龍捲風打開回調內)創建一個zmq.SUB
套接字到現有zmq.FORWARDER
設備。想法是從zmq接收數據作爲回調,然後可以通過websocket連接中繼到前端客戶端。附加ZMQStream與現有的龍捲風ioloop
https://gist.github.com/abhinavsingh/6378134
ws.py
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()
from tornado.websocket import WebSocketHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
ioloop = IOLoop.instance()
class ZMQPubSub(object):
def __init__(self, callback):
self.callback = callback
def connect(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect('tcp://127.0.0.1:5560')
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.callback)
def subscribe(self, channel_id):
self.socket.setsockopt(zmq.SUBSCRIBE, channel_id)
class MyWebSocket(WebSocketHandler):
def open(self):
self.pubsub = ZMQPubSub(self.on_data)
self.pubsub.connect()
self.pubsub.subscribe("session_id")
print 'ws opened'
def on_message(self, message):
print message
def on_close(self):
print 'ws closed'
def on_data(self, data):
print data
def main():
application = Application([(r'/channel', MyWebSocket)])
application.listen(10001)
print 'starting ws on port 10001'
ioloop.start()
if __name__ == '__main__':
main()
forwarder.py
import zmq
def main():
try:
context = zmq.Context(1)
frontend = context.socket(zmq.SUB)
frontend.bind('tcp://*:5559')
frontend.setsockopt(zmq.SUBSCRIBE, '')
backend = context.socket(zmq.PUB)
backend.bind('tcp://*:5560')
print 'starting zmq forwarder'
zmq.device(zmq.FORWARDER, frontend, backend)
except KeyboardInterrupt:
pass
except Exception as e:
logger.exception(e)
finally:
frontend.close()
backend.close()
context.term()
if __name__ == '__main__':
main()
publish.py
import zmq
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect('tcp://127.0.0.1:5559')
socket.send('session_id helloworld')
print 'sent data for channel session_id'
但是,我的ZMQPubSub
類似乎並沒有收到任何數據。
我繼續進行實驗,並意識到,我需要內ZMQPubSub
註冊on_recv
回調之後調用ioloop.IOLoop.instance().start()
。但是,這隻會阻止執行。
我也試過將main.ioloop
實例傳遞給ZMQStream
的構造函數,但是也沒有幫助。
是否存在被我可以綁定ZMQStream
現有main.ioloop
實例,不需要內MyWebSocket.open
阻塞流的方法嗎?
您使用的是什麼pyzmq和龍捲風版本?我只是用兩者的當前主人測試你的代碼,而PubSub對象的確在接收消息。 – minrk
'龍捲風== 3.1''pyzmq == 13.1.0' –
@minrk我已經更新了示例代碼,以反映在我的結尾正在嘗試的是什麼。你是否能夠在你的最後運行這個示例代碼並在'on_data'回調中接收數據? –