2012-01-21 31 views
13

我注意到一個zeromq PUB套接字緩衝所有傳出的數據,如果它是連接,例如ZeroMQ PUB套接字緩衝區我所有的傳出數據時,它連接

import zmq 
import time 
context = zmq.Context() 

# create a PUB socket 
pub = context.socket (zmq.PUB) 
pub.connect("tcp://127.0.0.1:5566") 
# push some message before connected 
# they should be dropped 
for i in range(5): 
    pub.send('a message should not be dropped') 

time.sleep(1) 

# create a SUB socket 
sub = context.socket (zmq.SUB) 
sub.bind("tcp://127.0.0.1:5566") 
sub.setsockopt(zmq.SUBSCRIBE, "") 

time.sleep(1) 

# this is the only message we should see in SUB 
pub.send('hi') 

while True: 
    print sub.recv() 

子這些消息後結合,他們應該被刪除,因爲如果沒有人連接到它,PUB應該放棄消息。但不是丟棄消息,而是緩存所有消息。

a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
hi 

正如你所看到的,那些「消息不應該被丟棄」是由插座緩衝,一旦它被連接時,它們刷新到SUB插口。如果我在PUB套接字處綁定,並在SUB套接字處連接,則它可以正常工作。

import zmq 
import time 
context = zmq.Context() 

# create a PUB socket 
pub = context.socket (zmq.PUB) 
pub.bind("tcp://127.0.0.1:5566") 
# push some message before connected 
# they should be dropped 
for i in range(5): 
    pub.send('a message should not be dropped') 

time.sleep(1) 

# create a SUB socket 
sub = context.socket (zmq.SUB) 
sub.connect("tcp://127.0.0.1:5566") 
sub.setsockopt(zmq.SUBSCRIBE, "") 

time.sleep(1) 

# this is the only message we should see in SUB 
pub.send('hi') 

while True: 
    print repr(sub.recv()) 

而且你只能看到輸出

'hi' 

這種奇怪的行爲引起一個問題,它緩存的連接插座上的所有數據,我有兩臺服務器,服務器A將數據發佈到服務器乙

Server A -- publish --> Server B 

如果服務器B上線後,它工作正常。但是如果我啓動服務器A並且不啓動服務器B呢?

其結果是,在服務器A上的連接插座PUB保持所有這些數據,內存使用率變高。

這裏的問題是,這種行爲是不是一個錯誤或功能?如果是功能,我在哪裏可以找到提及此行爲的文檔?我怎樣才能停止連接PUB套接字緩衝所有數據?

謝謝。

回答

6

是否插座塊或滴劑消息取決於如在ZMQ::Socket documentation描述的插座型(下面重點是礦):

ZMQ :: HWM:檢索高水位標記

的ZMQ: :HWM的選擇應檢索的 指定套接字的高水位。高水位標記是未完成消息的最大數量的硬限制0MQ將在指定套接字與之通信的單個對等體的任何 內存中排隊。

如果已達到極限插座應進入一個特殊的 狀態,並根據套接字類型,0MQ應採取適當行動 如阻塞或刪除發送的消息。有關每種套接字類型所採取的確切的 動作的詳細信息,請參閱ZMQ :: Socket中的單個套接字說明的 。

零的默認ZMQ :: HWM值表示「無限制」。

你可以看它是否會阻止或通過的說明文件,插槽類型爲ZMQ::HWM option action這要麼是BlockDrop看下降。

ZMQ::PUB的動作是Drop,所以如果它是不會放棄你應該檢查HWM(高水位)值,並聽取這一缺省值爲零ZMQ :: HWM值意味着「無極限」的警告,這意味着在系統內存不足之前它不會進入異常狀態(在這一點上我不知道它是如何表現的)。

+0

我知道我可以設置HWM來限制緩衝區中的消息號。但是它並沒有解決問題,PUB處理HWM狀態的方式就是刪除新消息。這意味着如果您設置HWM,則只有前導消息保存在緩衝區中。我正在寫的是音頻流媒體系統。這種行爲使得使用起來非常煩人。比方說,你發送消息[1,2,3,4],然後HWM被設置爲2,那麼套接字會爲你緩衝[1,2],所有新消息都會被丟棄。但對於音頻流,最重要的部分是新的數據。有什麼方法可以調整HWM如何丟棄信息? –

+0

啊,所以你的意思是你想要的行爲是,如果HWM設置爲2並且你發送[1,2,3,4],那麼它應該[1,2]並且保持[3,4],但是然後如果你發送了5,它應該減少3,並最終以[4,5]結束?我不認爲ZMQ中存在這種行爲。 – aculich

+0

這非常有趣。當然,有些應用程序需要刪除「舊」消息的能力(IP電話是一個常見的例子)。 –

0

所以結合()和連接()導致兩個不同的行爲。你爲什麼不只是選擇你喜歡哪一個(好像綁定())和使用?

它確實是ZeroMQ的一個特性,一般來說,它將緩衝出去的消息直到建立連接。

+0

因爲我有多個節點想要將數據發佈到一個衆所周知的服務器。當然,我可以綁定在PUB端,但結果是,我需要每個節點有N個地址,服務器不知道會有多少個節點。我認爲綁定和連接不應該影響行爲,一旦建立連接,綁定和連接沒有區別,那麼爲什麼會有所不同?我不明白:S –

+0

哦,好的。那麼我認爲ZeroMQ的行爲和預期的一樣,因此您可能只需在發送數據之前查詢連接。 –

+0

@JohnZwinck選擇'bind()'與'connect()'不是基於偏好,而應該基於它的使用方式。他通過服務器(發佈者)上的bind()和客戶端(訂戶)上的connect()來正確使用它。它並不總是緩衝傳出的消息,而是由套接字類型和高水位標記的值來確定[參見文檔](http://stackoverflow.com/a/8958699/462302 )。 – aculich

0

您應該可以使用hwm settingom酒吧套接字在套接字中設置高水位標記。它可以讓你定義保存多少條消息。

1

他們在套接字上設置HWM選項。

4

我覺得這種行爲是zmq_connect()的語義。 也就是:當zmq_connect()返回成功,則連接在概念上建立,因此連接PUB開始排隊消息而不是丟棄

繼從「ZMQ Guide」摘錄了這樣的提示:

與ØMQ插座理論,無所謂哪端連接,並 哪一端結合。但是,對於PUB-SUB插座,如果綁定SUB 插座並連接PUB插座,則SUB插座可能會收到舊的 消息,即在SUB啓動之前發送的消息。 這是一個 綁定/連接工作方式的工件。如果可以,最好綁定PUB和 連接SUB。

zmq_connect(部分)具有一些提示,如下圖所示:

到常規插座

主要差異一般來說,常規的插座呈現同步 接口要麼面向連接可靠字節流 (SOCK_STREAM)或無連接不可靠數據報(SOCK_DGRAM)。 相比之下,ØMQ套接字提供了一個異步的 消息隊列的抽象,具體確切的排隊語義取決於使用中的 套接字類型。在傳統的套接字傳輸 字節或離散數據報的流時,ØMQ套接字傳輸離散消息。

ØMQ插座是異步意味着物理 連接建立的定時和拆除,重新連接和有效的遞送是 對用戶透明的,並通過ØMQ自身組織的。此外,消息 可以在對等體不可用以接收它們的情況下排隊。

0

這裏是一個黑客工具,可以幫助...

設置你的ZMQ::HWM到一個固定的數,如10。連接後,請在循環中調用用戶套接字的方法recv,直到它丟棄所有緩存的消息,然後才啓動主接收循環。