我注意到一個zeromq PUB套接字緩衝所有傳出的數據,如果它是連接,例如ZeroMQ PUB套接字緩衝區我所有的傳出數據時,它連接
import zmq
import time
context = zmq.Context()
# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
pub.send('a message should not be dropped')
time.sleep(1)
# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")
time.sleep(1)
# this is the only message we should see in SUB
pub.send('hi')
while True:
print sub.recv()
子這些消息後結合,他們應該被刪除,因爲如果沒有人連接到它,PUB應該放棄消息。但不是丟棄消息,而是緩存所有消息。
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi
正如你所看到的,那些「消息不應該被丟棄」是由插座緩衝,一旦它被連接時,它們刷新到SUB插口。如果我在PUB套接字處綁定,並在SUB套接字處連接,則它可以正常工作。
import zmq
import time
context = zmq.Context()
# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
pub.send('a message should not be dropped')
time.sleep(1)
# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")
time.sleep(1)
# this is the only message we should see in SUB
pub.send('hi')
while True:
print repr(sub.recv())
而且你只能看到輸出
'hi'
這種奇怪的行爲引起一個問題,它緩存的連接插座上的所有數據,我有兩臺服務器,服務器A將數據發佈到服務器乙
Server A -- publish --> Server B
如果服務器B上線後,它工作正常。但是如果我啓動服務器A並且不啓動服務器B呢?
其結果是,在服務器A上的連接插座PUB保持所有這些數據,內存使用率變高。
這裏的問題是,這種行爲是不是一個錯誤或功能?如果是功能,我在哪裏可以找到提及此行爲的文檔?我怎樣才能停止連接PUB套接字緩衝所有數據?
謝謝。
我知道我可以設置HWM來限制緩衝區中的消息號。但是它並沒有解決問題,PUB處理HWM狀態的方式就是刪除新消息。這意味着如果您設置HWM,則只有前導消息保存在緩衝區中。我正在寫的是音頻流媒體系統。這種行爲使得使用起來非常煩人。比方說,你發送消息[1,2,3,4],然後HWM被設置爲2,那麼套接字會爲你緩衝[1,2],所有新消息都會被丟棄。但對於音頻流,最重要的部分是新的數據。有什麼方法可以調整HWM如何丟棄信息? –
啊,所以你的意思是你想要的行爲是,如果HWM設置爲2並且你發送[1,2,3,4],那麼它應該[1,2]並且保持[3,4],但是然後如果你發送了5,它應該減少3,並最終以[4,5]結束?我不認爲ZMQ中存在這種行爲。 – aculich
這非常有趣。當然,有些應用程序需要刪除「舊」消息的能力(IP電話是一個常見的例子)。 –