2013-08-28 54 views
3

我有一個應用程序,其中每個websocket連接(龍捲風打開回調內)創建一個zmq.SUB套接字到現有zmq.FORWARDER設備。想法是從zmq接收數據作爲回調,然後可以通過websocket連接中繼到前端客戶端。附加ZMQStream與現有的龍捲風ioloop

https://gist.github.com/abhinavsingh/6378134

ws.py

import zmq 
from zmq.eventloop import ioloop 
from zmq.eventloop.zmqstream import ZMQStream 
ioloop.install() 

from tornado.websocket import WebSocketHandler 
from tornado.web import Application 
from tornado.ioloop import IOLoop 
ioloop = IOLoop.instance() 

class ZMQPubSub(object): 

    def __init__(self, callback): 
     self.callback = callback 

    def connect(self): 
     self.context = zmq.Context() 
     self.socket = self.context.socket(zmq.SUB) 
     self.socket.connect('tcp://127.0.0.1:5560') 
     self.stream = ZMQStream(self.socket) 
     self.stream.on_recv(self.callback) 

    def subscribe(self, channel_id): 
     self.socket.setsockopt(zmq.SUBSCRIBE, channel_id) 

class MyWebSocket(WebSocketHandler): 

    def open(self): 
     self.pubsub = ZMQPubSub(self.on_data) 
     self.pubsub.connect() 
     self.pubsub.subscribe("session_id") 
     print 'ws opened' 

    def on_message(self, message): 
     print message 

    def on_close(self): 
     print 'ws closed' 

    def on_data(self, data): 
     print data 

def main(): 
    application = Application([(r'/channel', MyWebSocket)]) 
    application.listen(10001) 
    print 'starting ws on port 10001' 
    ioloop.start() 

if __name__ == '__main__': 
    main() 

forwarder.py

import zmq 

def main(): 
    try: 
     context = zmq.Context(1) 

     frontend = context.socket(zmq.SUB) 
     frontend.bind('tcp://*:5559') 
     frontend.setsockopt(zmq.SUBSCRIBE, '') 

     backend = context.socket(zmq.PUB) 
     backend.bind('tcp://*:5560') 

     print 'starting zmq forwarder' 
     zmq.device(zmq.FORWARDER, frontend, backend) 
    except KeyboardInterrupt: 
     pass 
    except Exception as e: 
     logger.exception(e) 
    finally: 
     frontend.close() 
     backend.close() 
     context.term() 

if __name__ == '__main__': 
    main() 

publish.py

import zmq 

if __name__ == '__main__': 
    context = zmq.Context() 
    socket = context.socket(zmq.PUB) 
    socket.connect('tcp://127.0.0.1:5559') 
    socket.send('session_id helloworld') 
    print 'sent data for channel session_id' 

但是,我的ZMQPubSub類似乎並沒有收到任何數據。

我繼續進行實驗,並意識到,我需要內ZMQPubSub註冊on_recv回調之後調用ioloop.IOLoop.instance().start()。但是,這隻會阻止執行。

我也試過將main.ioloop實例傳遞給ZMQStream的構造函數,但是也沒有幫助。

是否存在被我可以綁定ZMQStream現有main.ioloop實例,不需要內MyWebSocket.open阻塞流的方法嗎?

+0

您使用的是什麼pyzmq和龍捲風版本?我只是用兩者的當前主人測試你的代碼,而PubSub對象的確在接收消息。 – minrk

+0

'龍捲風== 3.1''pyzmq == 13.1.0' –

+0

@minrk我已經更新了示例代碼,以反映在我的結尾正在嘗試的是什麼。你是否能夠在你的最後運行這個示例代碼並在'on_data'回調中接收數據? –

回答

4

在你現在完整的例子,只需在您的貨代改變frontend到PULL套接字和發佈商合作插座推,並且你希望它應該做的。

是與此有關的插座選擇的一般原則是:當你想發送消息給大家誰是準備接受它(可能沒有之一)

  • 使用PUB/SUB
  • 使用推/拉當你想將消息發送到只有一個同行,等着他們做好準備

開始時可能會認爲你只是想PUB-SUB,但一旦你開始看每個插座對,你意識到他們是非常不同的。 frontend-websocket連接肯定是PUB-SUB - 你可能有零對多的接收器,而你只是想要發送消息給每個發生消息時恰好可用的消息。但後端方面不同 - 只有一個接收方,並且它肯定希望來自發布者的每條消息。

所以你有它 - 後端應該是PULL和前端PUB。您的所有插座:

PUSH -> [PULL-PUB] -> SUB 

publisher.py:插座PUSH,在device.py連接到backend

forwarder.py:backendPULLfrontendPUB ws.py:SUB連接和訂閱到forwarder.frontend

在您的情況下,使PUB/SUB在後端失敗的相關行爲是慢喬治病綜合徵,這是described in The Guide。從本質上講,訂閱者只需花費有限的時間來告訴發佈者關於訂閱的信息,因此如果你在打開PUB套接字後立即發送消息,可能性還不清楚它是否有訂閱者,所以它只是放棄消息。

+0

我在最後回覆中對我的解釋可能不太清楚。我看到你建議用拖纜(推/拉)設備替換貨運車(pub/sub)(如果錯誤的話糾正我)。但是,我的用例需要基於pub/sub的解決方案。每個websocket連接默認訂閱至少3個通道a)'all_channels' b)'$ {session_id} _channels' c)'$ {session_id} _channel _ $ {tab_id}'。在實踐中,c)可以利用推/拉,但a)和b)都需要pub/sub策略。說得通? –

+0

不,我不是建議用PUSH/PULL設備替換它 - 只需用PULL替換設備中的SUB,然後用PUSH替換髮布服務 - 這樣,整個鏈就是:'PUSH - > PULL-PUB - > SUB-websocket'。你有一個PULL-PUB設備,而不是PULL-PUSH或SUB-PUB。 – minrk

1

ZeroMq用戶必須訂閱他們希望收到的消息;我沒有看到你的代碼。我認爲Python的方法是這樣的:

self.socket.setsockopt(zmq.SUBSCRIBE, "") 
+0

我忘了在上面的問題中添加那一點。 Websocket連接確實訂閱了一組channel_id。我更新了上面的代碼以反映相同的情況。 –