2014-09-24 94 views
3

我正在嘗試使用zero-mq.My的要求非常簡單。我希望能夠在網絡中的兩個對等體之間進行通信。我遇到了此程序書中的例子。zero-mq:socket.recv()調用阻塞

$ pub_server.py

import zmq 
import random 
import sys 
import time 

port = "5556" 
if len(sys.argv) > 1: 
    port = sys.argv[1] 
    int(port) 

context = zmq.Context() 
socket = context.socket(zmq.PUB) 
socket.bind("tcp://*:%s" % port) 

while True: 
    topic = random.randrange(9999,10005) 
    messagedata = random.randrange(1,215) - 80 
    print "%d %d" % (topic, messagedata) 
    socket.send("%d %d" % (topic, messagedata)) 
    time.sleep(1) 

$sub_client.py

import sys 
import zmq 

port = "5556" 
if len(sys.argv) > 1: 
    port = sys.argv[1] 
    int(port) 

# Socket to talk to server 
context = zmq.Context() 
socket = context.socket(zmq.SUB) 

print "Collecting updates from weather server..." 
socket.connect ("tcp://localhost:%s" % port) 

# Subscribe to zipcode, default is NYC, 10001 
topicfilter = "10001" 
socket.setsockopt(zmq.SUBSCRIBE, topicfilter) 

# Process 5 updates 
total_value = 0 
for update_nbr in range (5): 
    string = socket.recv() 
    topic, messagedata = string.split() 
    total_value += int(messagedata) 
    print ('{} {}'.format(topic, messagedata)) 

print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/update_nbr))) 

我有這種模式的問題是,

string = socket.recv()

塊,直到我收到一個message.I不想這事發生了,我暈t消息在接收方排隊,這樣我就可以把它從隊列中排除(或類似的東西)

在zero-mq中有一些模型允許這麼做嗎?

+1

使用'zmq.Poller':HTTP: //stackoverflow.com/a/7540299/202775 – tuomur 2014-09-24 10:07:14

回答

5

zmq.Socket.recv如果您通過zmq.NOBLOCK標誌參數將不會阻止。

的文檔說:它接收和一個消息將對於此後ZMQError升高每個的recv()調用,直到該隊列耗盡返回

If NOBLOCK is set, this method will raise a ZMQError with EAGAIN if a message is not ready. 

ZMQ將排隊的消息。

zmq.Again在以下例子中使用的是wrapper for zmq.EAGAIN

例如:

while True: 
    try: 
     #check for a message, this will not block 
     message = socket.recv(flags=zmq.NOBLOCK) 

     #a message has been received 
     print "Message received:", message 

    except zmq.Again as e: 
     print "No message received yet" 

    # perform other important stuff 
    time.sleep(10) 

sub_client.py例子或許可以寫入使用非阻塞的行爲是這樣的:

在這個答案描述
import sys, time 
import zmq 

port = "5556" 
if len(sys.argv) > 1: 
    port = sys.argv[1] 
    int(port) 

# Socket to talk to server 
context = zmq.Context() 
socket = context.socket(zmq.SUB) 

print "Collecting updates from weather server..." 
socket.connect ("tcp://localhost:%s" % port) 

# Subscribe to zipcode, default is NYC, 10001 
topicfilter = "10001" 
socket.setsockopt(zmq.SUBSCRIBE, topicfilter) 

# Process 5 updates 
total_value = 0 
received_value_count = 0 
do_receive_loop = True 
while do_receive_loop: 
    try: 
     #process all messages waiting on subscribe socket 
     while True: 
      #check for a message, this will not block 
      string = socket.recv(flags=zmq.NOBLOCK) 

      #message received, process it 
      topic, messagedata = string.split() 
      total_value += int(messagedata) 
      print ('{} {}'.format(topic, messagedata)) 

      #check if we have all the messages we want 
      received_value_count += 1 
      if received_value_count > 4: 
       do_receive_loop = False 
       break 

    except zmq.Again as e: 
     #No messages waiting to be processed 
     pass 

    #Here we can do other stuff while waiting for messages 
    #contemplate answer to 'The Last Question' 
    time.sleep(15) 
    print "INSUFFICIENT DATA FOR MEANINGFUL ANSWER" 

print('Avg data value for topic {} was {}'.format(topicfilter, (total_value/5)))