2015-05-01 28 views
0

我有一個Node.js項目將消息從AWS SQS(消息隊列)中消費。我想在同一個項目中啓動多個消費者,以跟上放在隊列中的消息,而不必創建另一個Node.js實例。有沒有可能像Fibers或其他框架那樣做?我基本上試圖讓接收消息多線程的行爲(競爭消費者模式)。Node.js在一個項目中產生多個SQS監聽器線程

下面是一個例子:

var AWS = require('aws-sdk'), 
    nconf = require('nconf'), 
    SQS_URL = process.env.SQS_SERVICES_EVENTS; 

AWS.config.update({accessKeyId: nconf.get("accessKeyId"), secretAccessKey: nconf.get("secretAccessKey")}); 
AWS.config.update({region: nconf.get("region")}); 

// Initialize SQS 
var sqs = new AWS.SQS(); 

// Params for SQS 
var MAX_NUM_MSGS = nconf.get("sqs.max.messages"); 
var params = { 
    QueueUrl: SQS_URL, 
    MaxNumberOfMessages: MAX_NUM_MSGS, 
    VisibilityTimeout: 30, 
    WaitTimeSeconds: 20 
}; 

exports.startSqsListener = setInterval(sqsListener, nconf.get("sqs.interval")); 

// Make multi-threaded 
function sqsListener() { 
    sqs.receiveMessage(params, function(err, data) { 
     if (err) { 
      logger.error(err, err.stack); 
     } 
     if (data.Messages) { 
      // do something with each message 
     } 
    }); 
} 

回答

0

如果你要處理更多的消息同時你可以寫一個異步函數來實現的邏輯「做每個消息的東西」。只要消息不需要按順序處理,節點就會自動排隊每個異步執行並同時運行它們。

sqs.receiveMessage(params, function(err, data) { 
    if (err) { 
     logger.error(err, err.stack); 
    } 
    if (data.Messages) { 
     data.Messages.forEach(function(message) { 
      processMessage(message, function(err) { 
       //check for error, do something with it 
      }); 
     }); 
    } 
}); 

function processMessage(messageData, callback) { 
    //move your processing here 

    callback(); //or send error back, if needed 
} 
+0

不,我需要在比這更高的水平上做到這一點。到此爲止,我已經收到了處理該部分的消息。我基本上想要設置多個接收器,因此調用導出'startSqsListener'的調用函數可以調用它'n'多次來啓動將單獨接收消息的工作人員。這是我試圖實施的競爭消費者EAI模式。 – occasl

+1

如果你想運行更多的監聽器函數,難道你不能簡單地多次調用'startSqsListener'函數嗎?每次調用都會執行'setTimeout'函數,生成另一個間隔執行的偵聽器。 SQS將隱藏基於'VisibilityTimeout'的消息,因此您將在每個偵聽器中收到不同批次的消息。 –

+0

我最終做的是讓調用函數調用導出的函數'n'次。由於我在'setInterval'上有這個輸出,它會持續運行尋找SQS消息。感謝您對此有所瞭解。 – occasl