2014-01-25 79 views
3

我一直在玩pyzmq和簡單的負載平衡使用HWM,我不明白我看到的行爲。試圖瞭解zeromq高水位行爲

我已經設置了一個簡單的多線程測試,一個DEALER客戶端通過ROUTER到DEALER模式連接到兩個工人。 HWM設置爲1.其中一名工作人員非常快,另一名工作非常緩慢,而且所有客戶端都會向服務器發送100封垃圾郵件。這通常似乎起作用,而且速度更快的工作人員比慢速工作人員處理更多的消息。

然而,即使我將緩慢的工作者設置得如此緩慢,以至於快速工作者應該能夠在緩慢的工作者完成一個工作之前處理99條消息,但緩慢的工作者似乎仍然接受至少2或3個消息。

高水位行爲不準確還是我錯過了什麼?

服務器代碼如下:

import re, sys, time, string, zmq, threading, signal 


def worker_routine(worker_url, worker_id, context=None): 
    # socket to talk to dispatcher 
    context = context or zmq.Context.instance() 
    socket = context.socket(zmq.REP) 
    socket.set_hwm(1) 
    socket.connect(worker_url) 

    print "worker ", worker_id, " ready ..." 
    while True: 
     x = socket.recv() 

     if worker_id==1: 
      time.sleep(3) 

     print worker_id, x 
     sys.stdout.flush() 

     socket.send(b'world') 


context = zmq.Context().instance() 
# socket facing clients 
frontend = context.socket(zmq.ROUTER) 
frontend.bind("tcp://*:5559") 
# socket facing services 
backend = context.socket(zmq.DEALER) 
url_worker = "inproc://workers" 
backend.set_hwm(1) 
backend.bind(url_worker) 

# launch pool of worker threads 
for i in range(2): 
    thread = threading.Thread(target=worker_routine, args=(url_worker,i,)) 
    thread.start() 
    time.sleep(0.1) 

try: 
    zmq.device(zmq.QUEUE, frontend, backend) 
except: 
    print "terminating!" 

# we never get here 
frontend.close() 
backend.close() 
context.term() 

客戶端代碼如下:

import zmq, random, string, time, threading, signal 

# prepare our context and sockets 
context = zmq.Context() 
socket = context.socket(zmq.DEALER) 
socket.connect("tcp://localhost:5559") 

inputs = [''.join(random.choice(string.ascii_lowercase) for x in range(12)) for y in range(100)] 

for x in xrange(100): 
    socket.send_multipart([b'', str(x)]) 

print "finished!" 

輸出示例:

... 
0 81 
0 82 
0 83 
0 84 
0 85 
0 86 
0 87 
0 88 
0 89 
0 90 
0 91 
0 92 
0 93 
0 94 
0 95 
0 96 
0 97 
0 98 
0 99 
1 1 
1 3 
1 5 

回答

3

顯然ZeroMQ異步從發送發送消息()調用。也就是說,當send()返回時,消息尚未發送或添加到內部隊列中。如果發送速度足夠快,下次您發送郵件時,郵件仍然沒有添加到隊列中,因此尚未達到水印。你可以在隊列中添加數十或數百條消息,達到水印,並阻止發送行爲。

換句話說,嘗試在send()後休眠幾分之一秒和看看會發生什麼,它應該給消息添加到隊列上足夠的時間,所以到下一次發送時,它能夠看到水印已經到達。