2014-02-13 80 views
9

如何實現在幾次可配置重發嘗試後拒絕消息的機制?Node-amqp - X嘗試後拒絕消息

換句話說,如果我正在訂閱一個隊列,我想保證相同的消息不會再多發送X次。

我的代碼示例:

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) { 
    try{ 
    doSomething(data); 
    } catch(e) { 
    message.reject(true); 
    } 
} 
+0

可能唯一複製http://stackoverflow.com/q/17654475 – pinepain

+0

你是如何確定的消息?你在消息有效載荷中有你自己的ID嗎? –

+1

在我的情況(但我不是OP) - 是的,消息可以通過他們的UUID識別。唉,在訂閱者中只有一個簡單的計數器是不夠的,因爲我有多個訂閱者到同一個隊列來平衡工作,重試次數應該是全局的,而不是每個工作者本地的。 –

回答

1

在我看來,最好的解決方案是處理應用程序中的這些錯誤,並在應用程序決定無法處理消息時拒絕它們。

如果您不想丟失信息,應用程序只有在它將相同的消息發送到錯誤隊列後纔會拒絕該消息。

代碼沒有進行測試:

q.subscribe({ack: true}, function() { 
    var numOfRetries = 0; 
    var args = arguments; 
    var self = this; 
    var promise = doWork.apply(self, args); 
    for (var numOfRetries = 0; numOfRetries < MAX_RETRIES; numOfRetries++) { 
    promise = promise.fail(function() { return doWork.apply(self, args); }); 
    } 

    promise.fail(function() { 
    sendMessageToErrorQueue.apply(self, args); 
    rejectMessage.apply(self, args); 
    }) 
}) 
0

一個可能的解決方案是使用散列某種定義散列函數的消息,然後檢查緩存對象的散列。如果存在,請將緩存中的內容添加到可配置的最大值,如果不存在,請將其設置爲1.下面是一個快速而髒的原型(請注意,mcache對象應在所有訂戶的範圍內):

var mcache = {}, maxRetries = 3; 

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) { 
    var messagehash = hash(message); 
    if(mcache[messagehash] === undefined){ 
    mcache[messagehash] = 0; 
    } 
    if(mcache[messagehash] > maxRetries) { 
    q.shift(true,false); //reject true, requeue false (discard message) 
    delete mcache[messagehash]; //don't leak memory 
    } else { 
    try{ 
     doSomething(data); 
     q.shift(false); //reject false 
     delete mcache[messagehash]; //don't leak memory 
    } catch(e) { 
     mcache[messagehash]++; 
     q.shift(true,true); //reject true, requeue true 
    } 
    } 
} 

如果消息有一個GUID,你可以直接在散列函數中返回它。

+0

這引入了緩存作爲單點故障。我想有一個完全使用RabbitMQ技術的解決方案。 –

+0

您在哪種情況下預想緩存失敗? – gcochard

+0

緩存(物理上)需要在任何服務器上運行。如果這臺服務器停機怎麼辦?所以你需要高速緩存高可用性和故障安全,這增加了所需的工作量。如果有可能使用RabbitMQ本身,我會非常青睞這樣的解決方案。 –

相關問題