2013-04-11 56 views
2

我目前正在探索測試我的zeromq應用程序的可能性。我的印象是,我可以在同一個線程中擁有一個發佈者/訂閱者,讓發佈者發佈,並且訂閱者訂閱它而不會丟失消息。但是,當我讓發佈者發送一些消息時,沒有任何消息傳遞給訂閱者。來自單線程的ZeroMQ進程間通信丟失消息

這裏是我使用的代碼:

import zmq 

def main(): 
    ctx = zmq.Context.instance() 
    sender = ctx.socket(zmq.PUB) 
    sender.setsockopt(zmq.HWM, 1000) 
    sender.bind('tcp://*:10001') 

    rcvr = ctx.socket(zmq.SUB) 
    rcvr.setsockopt(zmq.HWM, 1000) 
    rcvr.connect('tcp://127.0.0.1:10001') 
    rcvr.setsockopt(zmq.SUBSCRIBE, "") 

    for i in range(100): 
     sender.send('%i' % i) 

    while True: 
     try: 
      print rcvr.recv(zmq.NOBLOCK) 
     except zmq.ZMQError: 
      break 


if __name__ == '__main__': 
    main() 

當運行這個,我沒有得到任何輸出。

讓我感到震驚的是接收方在發送方發送之前已連接,因此應排隊這些消息。或者,這是一個錯誤的假設,我應該用PUSH/PULL來代替?

+1

查看[guide](http://zguide.zeromq.org/page:all#Getting-the-Message-Out)並搜索** slow joiner **。 – 2013-04-20 15:22:12

回答

1

您應該將SUB套接字連接到端口10000而不是10001.目前,SUB套接字正在等待發布者,並且PUB套接字正在等待訂閱者。 0mq允許'客戶端'在沒有'服務器'的情況下連接的功能還意味着在連接到端口10001時沒有錯誤發生,並且這是設計的。

+0

發現它的好工作!然而,這是一個錯字,並沒有解決問題。 :( – bayer 2013-04-15 08:27:51

0

什麼打動我的是,發送方發送之前

這實際上並不是真正的連接接收器 - 接收器已開始在連接過程,但這並不意味着過程已完成。連接是異步的。

如果您在實際使用此爲內部進程通信,我會建議使用inproc運輸,其中這不是一個問題:

url = 'inproc://whatever' 
sender.bind(url) 
... 
recvr.connect(url) 
+0

然而,沒有理由爲什麼它不應該與TCP協同工作,是嗎? – bayer 2013-04-16 07:18:23

+0

其他我說的非Inproc連接是異步的,所以你開始發送之前實際上有一個對等連接,因此消息正在被丟棄,如果你在發送之前有一個睡眠,它可以正常工作。 – minrk 2013-04-17 04:21:14

2

我覺得這是緩慢的木匠問題的情況下,描述在ZeroMQ guide

這種「緩慢的木匠」症狀足以讓足夠多的人受到足夠的關注,我們將詳細解釋它。

我相信主要的問題是所有的信息都是在用戶套接字開始偵聽之前發送的,並且這些信息通過並丟棄。在設置套接字和發送消息之間延遲不起作用,因爲在接收器開始收聽之前已經發送了最後的消息。

正如您所建議的,推/拉套接字會將內存中的作業排隊。你可以把插座之間的工作在一個單一的過程是這樣

# pushpull.py 
import zmq 

def main(): 
    ctx = zmq.Context() 
    sender = ctx.socket(zmq.PUSH) 
    sender.bind('tcp://*:10001') 

    rcvr = ctx.socket(zmq.PULL) 
    rcvr.connect('tcp://127.0.0.1:10001') 

    for i in range(100): 
     sender.send_unicode('%i' % i) 

    while True: 
     msg = rcvr.recv() 
     print(msg) 

if __name__ == '__main__': 
    main() 

或者,如果你想使用的pub/sub插座,我們需要兩個過程和插座的設置和消息歡送之間的time.sleep(1)

首先啓動接收器

# rcvr.py 
import zmq 

def main(): 
    ctx = zmq.Context() 
    rcvr = ctx.socket(zmq.SUB) 
    rcvr.connect('tcp://127.0.0.1:10001') 
    rcvr.setsockopt_string(zmq.SUBSCRIBE, "") 

    while True: 
     msg = rcvr.recv() 
     print(msg) 

if __name__ == '__main__': 
    main() 

然後寄件人,

# sender.py 
import zmq 
import time 

def main(): 
    ctx = zmq.Context() 
    sender = ctx.socket(zmq.PUB) 
    sender.bind('tcp://*:10001') 

    time.sleep(1) 
    for i in range(100): 
     sender.send_unicode('%i' % i) 

if __name__ == "__main__": 
    main() 

接受:

b'0' 
b'1' 
b'2' 
b'3' ... 

我在Python 3.3工作,並在與真棒WinPython分配的那一刻pyzmq 13.1.0,所以一些字符串中的ZMQ處理的要求是有點不同,以及打印功能。 希望它有幫助。