2017-09-06 27 views
0

每當我發佈新消息時,它都會創建一個新連接。我想只有一個連接和一個通道用於所有公共電話。對於節點應用程序只需要一個RbbitMQ連接,而不是每個發佈調用

從rabbitmq網站閱讀: 某些應用程序需要多個連接到AMQP代理。但是,不希望同時打開許多TCP連接,因爲這樣做會佔用系統資源並使配置防火牆變得更加困難。 AMQP 0-9-1連接與可被認爲是「共享單個TCP連接的輕量級連接」的通道複用。

但是如何? 這裏是我的代碼:

Channel.js

var amqp = require('amqplib/callback_api'); 

var url = process.env.AMQP_URL || 'amqp://guest:[email protected]:5672'; 

module.exports = createQueueChannel; 

function createQueueChannel(queue, cb) { 
    console.log("connecting................"); 
    amqp.connect(url, onceConnected); 

    function onceConnected(err, conn) { 
    if (err) { 
     console.error('Error connecting:', err.stack); 
    } 
    else { 
     console.log('connected'); 
     conn.createChannel(onceChannelCreated); 
    } 

    function onceChannelCreated(err, channel) { 
     if (err) { 
     cb(err); 
     } 
     else { 
     channel.assertQueue(queue, {durable: true}, onceQueueCreated); 
     } 

     function onceQueueCreated(err) { 
     if (err) { 
      cb(err); 
     } 
     else { 
      cb(null, channel, conn); 
     } 
     } 
    } 
    } 

} 

Publish.js

var Channel = require('./channel'); 

var queue = 'queue'; 

Channel(queue, function(err, channel, conn) { 
    if (err) { 
    console.error(err.stack); 
    } 
    else { 
    console.log('channel and queue created'); 
    var work = 'Do some work'; 
    channel.sendToQueue(queue, encode(work), { 
     persistent: true 
    }); 
    // setImmediate(function() { 
    // channel.close(); 
    // conn.close(); 
    // }); 
    } 
}); 


function encode(doc) { 
    return new Buffer(JSON.stringify(doc)); 
} 

回答

0

定義您的發佈功能外連接(amqpConn)和發行渠道(pubChannel),並使用該頻道,當你發佈消息。

我建議你看一看完整的示例代碼瀏覽:https://gist.github.com/carlhoerberg/006b01ac17a0a94859bahttps://www.cloudamqp.com/blog/2015-05-19-part2-2-rabbitmq-for-beginners_example-and-sample-code-node-js.html) 哪裏還脫機隊列中情況下使用的連接被中斷了一段時間。

並且當連接時,您啓動發佈者。

function whenConnected() { 
    startPublisher() 
} 


var pubChannel = null; 
var offlinePubQueue = []; 
function startPublisher() { 
    amqpConn.createConfirmChannel(function(err, ch) { 
    if (closeOnErr(err)) return; 
    ch.on("error", function(err) { 
    console.error("[AMQP] channel error", err.message); 
    }); 
    ch.on("close", function() { 
    console.log("[AMQP] channel closed"); 
    }); 

    pubChannel = ch; 
    while (true) { 
    var m = offlinePubQueue.shift(); 
    if (!m) break; 
    publish(m[0], m[1], m[2]); 
    } 
}); 

}

和發佈功能,如:

function publish(exchange, routingKey, content) { 
    try { 
    pubChannel.publish(exchange, routingKey, content, { persistent: true }, 
     function(err, ok) { 
     if (err) { 
      console.error("[AMQP] publish", err); 
      offlinePubQueue.push([exchange, routingKey, content]); 
      pubChannel.connection.close(); 
     } 
     } 
    ); 
    } catch (e) { 
    console.error("[AMQP] publish", e.message); 
    offlinePubQueue.push([exchange, routingKey, content]); 
    } 
} 
相關問題