2013-07-18 111 views
9

我想寫一個nodejs sqs隊列處理器。Nodejs sqs隊列處理器

"use strict"; 
var appConf = require('./config/appConf'); 
var AWS = require('aws-sdk'); 
AWS.config.loadFromPath('./config/aws_config.json'); 
var sqs = new AWS.SQS(); 
var exec = require('child_process').exec; 
function readMessage() { 
    sqs.receiveMessage({ 
    "QueueUrl": appConf.sqs_distribution_url, 
    "MaxNumberOfMessages": 1, 
    "VisibilityTimeout": 30, 
    "WaitTimeSeconds": 20 
    }, function (err, data) { 
    var sqs_message_body; 
    if (data.Messages) { 
     if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') { 
     //sqs msg body 
     sqs_message_body = JSON.parse(data.Messages[0].Body); 
     //make call to nodejs handler in codeigniter 
     exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"', 
      function (error, stdout, stderr) { 
      if (error) { 
       throw error; 
      } 
      console.log('stdout: ' + stdout); 
      if(stdout == 'Success'){ 
       //delete message from queue 
       sqs.deleteMessage({ 
       "QueueUrl" : appConf.sqs_distribution_url, 
       "ReceiptHandle" :data.Messages[0].ReceiptHandle 
       }); 
      } 
      }); 
     } 
    } 
    }); 
} 
readMessage(); 

上述代碼適用於隊列中的單個消息。我應該如何編寫此腳本,以便它可以繼續輪詢隊列中的郵件,直到處理完所有郵件?我應該使用設置超時嗎?所有的

回答

15

首先,你應該definetely使用長輪詢Amazon提供技術,據我所知,你已經在使用它,因爲你必須在sqs.receiveMessage通話"WaitTimeSeconds": 20說法。我希望你不要忘記在AWS Web interface中配置它。

關於輪詢消息 - 你可以使用不同的技術,包括定時器,但我認爲最簡單的將只是叫你readMessage()功能在receiveMessage的(甚至exec的)回調函數結束。因此,處理(或等待)隊列中的下一個消息將在隊列中的前一個消息的處理結束之後立即開始。

UPDATE:

至於我在新的代碼版本有許多readMessage()電話。我認爲最好將其最小化以使代碼更清晰易於維護。但是,如果你離開了,例如,在主回調結束時唯一的一個調用,你將會收到大量並行運行的PHP工作者腳本 - 從性能的角度來看可能並不是那麼糟 - 但是你將不得不添加一些複雜的腳本來控制並行工作者的數量。我認爲您可以在exec回撥中減少一些呼叫,嘗試加入if s並在主回叫中加入呼叫。

"use strict"; 
var appConf = require('./config/appConf'); 
var AWS = require('aws-sdk'); 
AWS.config.loadFromPath('./config/aws_config.json'); 
var delay = 20 * 1000; 
var sqs = new AWS.SQS(); 
var exec = require('child_process').exec; 
function readMessage() { 
    sqs.receiveMessage({ 
    "QueueUrl": appConf.sqs_distribution_url, 
    "MaxNumberOfMessages": 1, 
    "VisibilityTimeout": 30, 
    "WaitTimeSeconds": 20 
    }, function (err, data) { 
    var sqs_message_body; 
    if (data.Messages) 
     && (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined')) { 
     //sqs msg body 
     sqs_message_body = JSON.parse(data.Messages[0].Body); 
     //make call to nodejs handler in codeigniter 
     exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"', 
      function (error, stdout, stderr) { 
      if (error) { 
       // error handling 
      } 
      if(stdout == 'Success'){ 
       //delete message from queue 
       sqs.deleteMessage({ 
       "QueueUrl" : appConf.sqs_distribution_url, 
       "ReceiptHandle" :data.Messages[0].ReceiptHandle 
       }, function(err, data){     
       }); 
      } 
      readMessage();     
      }); 
     }   
    }   
    readMessage();   
    }); 
} 
readMessage(); 

關於內存泄漏:我認爲你不應該擔心,因爲readMessage()下一次調用回調函數發生了 - 所以不能遞歸調用,並遞歸調用的函數只調用函數receiveMessage()後返回值父功能。

+0

您好,請查看我的這個要點。 https://gist.github.com/yalamber/374add88e887e688d818 – Yalamber

+0

另外我應該擔心運行此腳本時有任何內存泄漏? – Yalamber

+0

@askkirati更新! – zavg

1

如果您正在使用節點,請使用https://www.npmjs.com/package/sqs-worker模塊。它會爲你做這項工作。

var SQSWorker = require('sqs-worker') 

var options = 
{ url: 'https://sqs.eu-west-1.amazonaws.com/001123456789/my-queue' 
} 

var queue = new SQSWorker(options, worker) 

function worker(notifi, done) { 
    var message; 
    try { 
    message = JSON.parse(notifi.Data) 
    } catch (err) { 
    throw err 
    } 

    // Do something with `message` 

    var success = true 

    // Call `done` when you are done processing a message. 
    // If everything went successfully and you don't want to see it any more, 
    // set the second parameter to `true`. 
    done(null, success) 
} 
+1

爲什麼倒票? https://github.com/BBC/sqs-consumer也是一種選擇 – Cmag