2012-06-28 42 views
1

是否有一種常見模式用於使0mq pub/sub中的發佈者在節點中冗餘?動機是能夠與可能會失敗/定期重新啓動的發佈者一起運行多個進程。如何在Node.js中創建一個ZeroMQ發佈者冗餘?

我最初的想法是在主創建一個轉發器,並從工人出版商連接到它:

var cluster = require('cluster') 
    , zmq = require('zmq') 
    , endpointIn = 'ipc:///tmp/cluster_pub_sub' 
    , endpointOut = 'tcp://127.0.0.1:7777'; 

if (cluster.isMaster) { 
    for (var i = 0; i < 2; i++) cluster.fork(); 
    startPubSubForwarder(); 
} else { 
    startPublisher(); 
} 

function startPublisher() { 
    var socket = zmq.socket('pub'); 
    socket.connect(endpointIn); 
    setInterval(function() { 
    socket.send('pid=' + process.pid); 
    }, 1000); 
} 

function startPubSubForwarder() { 
    var sIn = zmq.socket('sub') 
    , sOut = zmq.socket('pub'); 

    // incoming 
    sIn.subscribe(''); 
    sIn.bind(endpointIn, function (err) { 
    if (err) throw err; 
    }); 
    sIn.on('message', function (data) { 
    sOut.send(data); 
    }); 

    // outgoing 
    sOut.bind(endpointOut, function (err) { 
    if (err) throw err; 
    }); 
} 

這樣做的還有其他/更好的方法?

回答

0

如果您的擔心是消息持久性,那麼我認爲您不必擔心有多個發佈者,更擔心在發佈者死後確保郵件不會丟失。您可以立即重新啓動發佈服務器,然後選擇停止發佈的位置。您還需要知道哪些消息已成功發送。

這需要的是1)持久性存儲和2)以及向發佈者確認消息在接收端被接收(並且可能處理完成)的手段。這個設置應該解決您的可靠性需求。

如果你還想完成高規模,那麼你需要增加一點體系結構。對於發送方和接收方爲1:1的發送/接收方案來說更直接,當您需要執行1:N輪循機制/負載分配方案時更復雜一些,這可能是您需要的擴展。

我上完成的升級場景輸入是具有以下設置:

sender_process - (1:1) - >分配器 - (1:N) - > receiver_process(ES )

其中分發者確認收到來自發送者的消息,然後扇出到接收者進程。

您可能希望通過在每個進程前面放置一個隊列來完成此操作。所以,你不會發送進程。您發送到進程讀取的隊列。發件人將東西放在分配器隊列中。經銷商將東西放在接收器的隊列中。在每個點上,每個進程都試圖進行處理。如果它失敗了一些最大重試次數,它會進入一個錯誤隊列。

我們使用rabbitmq/amqp來完成所有這些工作。我已經開始公開我們用來執行1:1和1:N發送的總線:https://github.com/mateodelnorte/servicebus。我將在接下來的幾天內添加自述文件和更多測試。

+0

(我的問題肯定是關於你的第二個例子)。我擔心「貨運代理商/分銷商」箱子正在倒塌。我怎樣纔能有兩個或更多的這些「轉發/分銷」流程? HAProxy(或任何其他TCP代理)是唯一的方法嗎?或者0mq中有什麼可以執行負載平衡/故障轉移? –

+0

您不希望每個消息類型使用多個分發服務器。您的一位經銷商將從1個發件人發送郵件到N個收件人。 您應該只能使用0mq作爲您的分銷商的運輸工具。您必須編寫一些自定義代碼來訂閱一條消息併發送給N個偵聽器,以及任何錯誤隊列類型方案。 – MateodelNorte

+0

還有一件需要注意的事情: 由於您的郵件是持久性的,如果分銷商發生故障,它會在郵件被保存在隊列中時從其停止的地方開始發送。 – MateodelNorte

0

從您的示例代碼中,我認爲XPUB/XSUB 0MQ模式是您最合適的。這是實現同一個「startPubSubForwarder()」塊的更有效的方式,而且您的訂閱方可以直接在發佈服務器後端訂閱特定模式。在這裏,我用publishers/xpub-xsub(以代理方式)/下標:https://github.com/krakatoa/node_zmq_workshop/tree/master/03.3_news_proxy的例子離開了一個鏈接。這是NodeJS代碼(它是我的,不介意問細節!)。