2014-02-14 146 views
1

我正在嘗試使用ZMQ構建多個發佈者/多訂戶拓撲。我創建了一個使用espresso.py示例的示例,對其進行了一些細微的修改。我想確保我所做的是正確的,因爲我對zeromq相當陌生。請隨時批評和評論。使用XPUB/XSUB的ZeroMQ多重發布者和訂閱者 - 這是一個正確的實現嗎?

我已經基本上吸取了一些教訓。

  • 一個ZMQ套接字可以綁定到只有在多個流程的一個端口到一個單一的網絡卡(又名普通插座)

  • 綁定並不意味着聽即你可以綁定後發出connect() (對於套接字開發人員來說非常混淆,但是這不是套接字)

  • 代理和XPUB/XSUB是用來作爲模式,當訂閱者不需要找出並連接到所有的發佈者。

我真的不喜歡關於下面的代碼是每個用戶綁定到一個單獨的插座。雖然這是一個必要的罪惡,但不知何故,我一直認爲這看起來不正確。

所以這裏是我的示例代碼。

# Espresso Pattern 
# This shows how to capture data using a pub-sub proxy 
# 

import time 

from random import randint 
from string import uppercase 
from threading import Thread 

import zmq 
from zmq.devices import monitored_queue 

from zhelpers import zpipe 

# The subscriber thread requests messages starting with 
# A and B, then reads and counts incoming messages. 


def subscriber_thread(): 
    ctx = zmq.Context.instance() 

    # Subscribe to "A" and "B" 
    subscriber = ctx.socket(zmq.SUB) 
    subscriber.connect("tcp://localhost:6001") 
    subscriber.setsockopt(zmq.SUBSCRIBE, b"A") 
    subscriber.setsockopt(zmq.SUBSCRIBE, b"B") 

    count = 0 
    while True: 
     try: 
      msg = subscriber.recv_multipart() 
     except zmq.ZMQError as e: 
      if e.errno == zmq.ETERM: 
       break   # Interrupted 
      else: 
       raise 
     count += 1 

    print ("Subscriber received %d messages" % count) 


# .split publisher thread 
# The publisher sends random messages starting with A-J: 

def publisher_thread(port, char): 
    ctx = zmq.Context.instance() 

    publisher = ctx.socket(zmq.PUB) 
    publisher.bind("tcp://*:"+str(port)) 

    while True: 
     string = "%s-%05d" % (char, randint(port, port+500)) 
     try: 
      publisher.send(string) 
     except zmq.ZMQError as e: 
      if e.errno == zmq.ETERM: 
       break   # Interrupted 
      else: 
       raise 
     time.sleep(0.1)   # Wait for 1/10th second 

# .split listener thread 
# The listener receives all messages flowing through the proxy, on its 
# pipe. Here, the pipe is a pair of ZMQ_PAIR sockets that connects 
# attached child threads via inproc. In other languages your mileage may vary: 

def listener_thread(pipe): 

    # Print everything that arrives on pipe 
    while True: 
     try: 
      print (pipe.recv_multipart()) 
     except zmq.ZMQError as e: 
      if e.errno == zmq.ETERM: 
       break   # Interrupted 


# .split main thread 
# The main task starts the subscriber and publisher, and then sets 
# itself up as a listening proxy. The listener runs as a child thread: 

def main(): 

    # Start child threads 
    ctx = zmq.Context.instance() 
    p_thread1 = Thread(target=publisher_thread, args=(6000,'A')) 
    p_thread2 = Thread(target=publisher_thread, args=(7000,'B')) 
    s_thread = Thread(target=subscriber_thread) 
    p_thread1.start() 
    p_thread2.start() 
    s_thread.start() 

    pipe = zpipe(ctx) 

    subscriber = ctx.socket(zmq.XSUB) 
    subscriber.connect("tcp://localhost:6000") 
    subscriber.connect("tcp://localhost:7000") 

    publisher = ctx.socket(zmq.XPUB) 
    publisher.bind("tcp://*:6001") 

    l_thread = Thread(target=listener_thread, args=(pipe[1],)) 
    l_thread.start() 

    try: 
     monitored_queue(subscriber, publisher, pipe[0], 'pub', 'sub') 
    except KeyboardInterrupt: 
     print ("Interrupted") 

    del subscriber, publisher, pipe 
    ctx.term() 

if __name__ == '__main__': 
    main() 
+1

好現在我很困惑。上面的代碼實際上是不正確的。根據XPUB/XSUB文檔,它應該是XPUB/XSUB端的bind(),而訂戶和發佈者都應該使用connect()(代碼連接PDF上的第1卷,第48頁)。我再也不能在這裏上傳代碼,但你明白了。 – vivekv

回答

2

我在ZeroMQ github頁面上提出了一個問題,並得到了回覆。這是ZeroMQ中的一個已知錯誤,這是由於訂閱消息的接收方完全準備好之前發佈和訂閱發生在提出訂閱請求的不同線程中。更多詳細信息可以在這裏找到。

https://github.com/zeromq/libzmq/issues/897

我試圖模仿這裏的問題

https://gist.github.com/vivekfantain/9021979

分享這一切都爲別人誰在同樣的問題絆倒。

+0

更重要的一點 - XPUB/XSUB在ZMQ 2.xx版本中被嚴重破壞。我設法讓它只與4.x版本一起工作。 – vivekv

相關問題