2016-03-10 104 views
0

考慮以下幾點:如何隊列連接到ZeroMQ PUB/SUB

  • 一組3個邏輯服務:S1S2和每個服務正在運行的S3
  • 兩個實例,所以我們有以下進程:S1P1S1P2S2P1S2P2S3P1S3P2
  • 一個ZeroMQ經紀人在單個進程中運行,並且可通過所有服務流程

一個合乎邏輯的服務,讓我們說S1,發佈消息M1是感興趣的邏輯服務S2S3。每個邏輯服務只有一個進程必須接收M1,所以我們假設S2P1S3P2

我曾嘗試以下,但沒有成功:

  • 經紀人線程1正在運行XSUB/XPUB代理
  • 經紀人線程2正在運行ROUTER/DEALER代理與連接到XPUBROUTER套接字和訂閱所有內容(對於邏輯S1
  • 代理線程3正在運行ROUTER/DEALER代理與ROUTER連接到XPUB插座和訂閱所有(邏輯S2
  • 經紀人螺紋4是運行ROUTER/DEALER代理與連接到XPUB插座ROUTER和訂閱所有(邏輯S3
  • 每個邏輯服務過程正在運行連接到代理DEALER插座

我盤算了一下,XSUB/XPUB代理會給我發佈/訂閱語義,而且ROUTER/DEALER代理將引入競爭是REP插座線補間XSUB/XPUB代理髮送的郵件的REP套接字。

我怎樣才能結合ZeroMQ插座來完成這個?

UPDATE1

我知道「沒有成功」是沒有幫助的,我已經嘗試了不同的配置,得到了不同的錯誤。我試過的最新配置如下:

(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP

的copyLoop是這樣的:

public void start() { 
    context = ZMQ.context(1); 

    subSocket = context.socket(ZMQ.SUB); 
    subSocket.connect(subSocketUrl); 
    subSocket.subscribe("".getBytes()); 

    reqSocket = context.socket(ZMQ.REQ); 
    reqSocket.connect(reqSocketUrl); 

    while (!Thread.currentThread().isInterrupted()) { 
     final Message msg = receiveNextMessage(); 
     resendMessage(msg); 
    } 
} 

private Message receiveNextMessage() { 
    final String header = subSocket.recvStr(); 
    final String entity = subSocket.recvStr(); 

    return new Message(header, entity); 
} 

private void resendMessage(Message msg) { 
    reqSocket.sendMore(msg.getKey()); 
    reqSocket.send(msg.getData(), 0); 
} 

的例外,我得到的是以下幾點:

java.lang.IllegalStateException: Cannot send another request 
    at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na] 
    at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na] 
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na] 
    at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na] 
    at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na] 
    at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na] 

我跑JeroMQ 0.3.4,Oracle Java 8 JVM和Windows 7.

+0

您可能想知道**'ZeroMQ'是無代理商設計** – user3666197

+0

當您說「沒有成功」時 - 您遇到了哪些問題?我會在答案中提出一個替代(但相似)的體系結構,但根據您遇到問題的位置可能沒有幫助。 – Jason

+0

@Jason你說得沒錯,這是因爲我嘗試了不同的組合,並得到了不同的例外。我已更新問題以反映最新的配置。 – Spiff

回答

2

你似乎要增加連接的複雜性 - 您應該能夠將所有連接直接連接到您的發佈商。

您當前運行到該錯誤是REQ插座有嚴格的消息排序模式 - 你不準send()連續兩次,你必須發送/接收/發送/接收/等(同樣,REP插座必須接收/發送/接收/發送/等)。從它看起來像,你只是在你的REQ套接字上發送/發送/發送/等沒有收到任何迴應。如果你不關心對方的迴應,那麼你必須收到並放棄它,或者使用DEALER(或ROUTER,但DEALER在當前圖表中更有意義)。

我已經創建了一個關於如何使用您的基本流程結構來完成以下架構的圖表。

Broker T1   Broker T2    Broker T3    Broker T4 
(PUB*)------>(*SUB)[--](DEALER*) -->(*SUB)[--](DEALER*) -->(*SUB)[--](DEALER*) 
     |_____________________||____|     || |     || 
     |_____________________||_______________________||____|     || 
          ||      ||      || 
    ========================||  ==================||   ===========||= 
    ||    ||    ||    ||    ||    || 
    ||    ||    ||    ||    ||    || 
    ||    ||    ||    ||    ||    || 
(REP*)   (REP*)   (REP*)   (REP*)   (REP*)   (REP*) 
S1P1   S1P2   S2P1   S2P2   S3P1   S3P2 

因此,主要的區別是,我已經拋棄你的(SUB copyLoop=> REQ)一步。無論您選擇XPUB/XSUB還是PUB/SUB都取決於您,但我會傾向於更簡單地開始,除非您當前想要使用XPUB/XSUB的額外功能。

顯然這個圖表並不涉及信息如何進入你的經紀人,你目前顯示的是一個XSUB套接字 - 這超出了你迄今爲止提供的信息的範圍,大概你能夠接收到你的信息經紀人已經成功,所以我不會處理。

我假設您的代理線程專用於每個服務正在做出是否將消息發送到其服務的智能選擇?如果是這樣,那麼您選擇讓他們訂閱所有內容應該可以正常工作,否則可能需要更智能的訂閱設置。

如果你使用你的服務過程中一個REP插座,然後該服務過程中必須採取的消息,並用它處理異步,從未傳送回該消息的到代理的任何細節。然後它必須通過確認(如「已收到」)對每條消息作出響應,以便遵循REP套接字的嚴格接收/發送/接收/發送模式。

如果您想要其他類型的有關服務如何處理髮送回代理的消息的通信,REP不再是您的服務進程的合適套接字類型,並且DEALER可能不再是您的正確套接字類型經紀人。如果您想要某種形式的負載平衡,以便您發送到下一個打開的服務進程,則需要使用ROUTER/REQ並讓每個服務指示其可用性,並讓代理持有該消息,直到下一個服務進程顯示其可用通過發回結果。如果你想要一些其他類型的消息處理,你必須指出那是什麼,所以可以提出一個合適的架構。

+0

非常感謝您提供詳細的答案,我顯然不明白REQ在發送其他消息之前期待回覆的事實。我已經實現了這個圖,並且在沒有錯誤的情況下,消息不會到達目的地。 SUB/DEALER代理訂閱了所有內容。你能否確認SUB/PUB和SUB/DEALER經紀人都在運行ZMQ.proxy? – Spiff

+0

使用ZMQ代理可能是實現它的首選方法。你知道郵件卡住的地方嗎?它通過多遠? – Jason

+0

我不知道,跟蹤它的最好方法是什麼? – Spiff

0

顯然,我搞混了幾個要素:

  • 套接字具有相同的API是否你使用它作爲一個客戶端套接字(Socket.connect)或服務器端的socket(Socket.bind
  • 套接字具有相同的API不管類型(例如Socket.subscribe不應該在PUSH插座的稱呼)
  • 一些插座類型需要發送/接收響應環(如REQ/REP
  • 在通信模式的一些細微差別(PUSH/PULL VS ROUTER/DEALER
  • 在調試ZeroMQ設置

所以非常感謝傑森對他極爲細緻的解答難度(impossiblity?)(和真棒圖!)是指我走向正確的方向。

我結束了以下設計:

  • 經紀人線程1上運行bind(localhost:6000)扇出XSUB/XPUB代理和bind(localhost:6001)
  • 經紀人線程2上運行connect(localhost:6001)bind(localhost:6002)排隊SUB/PUSH代理;經紀人線程3和4中使用具有不同的綁定端口號類似的設計
  • 消息生產者連接到使用一個PUB插座代理上connect(localhost:6000)
  • 消息消費者連接到使用一個PULL插座代理排隊代理上connect(localhost:6002)

在這個特定的服務排隊機制的頂部,我是能夠增加一個類似特定服務扇出機制,而不是簡單:

  • 經紀人線程運行的SUB/PUB代理上connect(localhost:6001)bind(localhost:6003)
  • 消息生產者仍然使用PUB插座連接到代理上connect(localhost:6000)
  • 消息使用者連接到使用SUB插座經紀人扇出代理上connect(localhost:6003)

這一直一個有趣的旅程。