我一直在玩pyzmq和簡單的負載平衡使用HWM,我不明白我看到的行爲。試圖瞭解zeromq高水位行爲
我已經設置了一個簡單的多線程測試,一個DEALER客戶端通過ROUTER到DEALER模式連接到兩個工人。 HWM設置爲1.其中一名工作人員非常快,另一名工作非常緩慢,而且所有客戶端都會向服務器發送100封垃圾郵件。這通常似乎起作用,而且速度更快的工作人員比慢速工作人員處理更多的消息。
然而,即使我將緩慢的工作者設置得如此緩慢,以至於快速工作者應該能夠在緩慢的工作者完成一個工作之前處理99條消息,但緩慢的工作者似乎仍然接受至少2或3個消息。
高水位行爲不準確還是我錯過了什麼?
服務器代碼如下:
import re, sys, time, string, zmq, threading, signal
def worker_routine(worker_url, worker_id, context=None):
# socket to talk to dispatcher
context = context or zmq.Context.instance()
socket = context.socket(zmq.REP)
socket.set_hwm(1)
socket.connect(worker_url)
print "worker ", worker_id, " ready ..."
while True:
x = socket.recv()
if worker_id==1:
time.sleep(3)
print worker_id, x
sys.stdout.flush()
socket.send(b'world')
context = zmq.Context().instance()
# socket facing clients
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# socket facing services
backend = context.socket(zmq.DEALER)
url_worker = "inproc://workers"
backend.set_hwm(1)
backend.bind(url_worker)
# launch pool of worker threads
for i in range(2):
thread = threading.Thread(target=worker_routine, args=(url_worker,i,))
thread.start()
time.sleep(0.1)
try:
zmq.device(zmq.QUEUE, frontend, backend)
except:
print "terminating!"
# we never get here
frontend.close()
backend.close()
context.term()
客戶端代碼如下:
import zmq, random, string, time, threading, signal
# prepare our context and sockets
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect("tcp://localhost:5559")
inputs = [''.join(random.choice(string.ascii_lowercase) for x in range(12)) for y in range(100)]
for x in xrange(100):
socket.send_multipart([b'', str(x)])
print "finished!"
輸出示例:
...
0 81
0 82
0 83
0 84
0 85
0 86
0 87
0 88
0 89
0 90
0 91
0 92
0 93
0 94
0 95
0 96
0 97
0 98
0 99
1 1
1 3
1 5