2015-03-03 20 views
1

我實際嘗試使用node-amqp(https://github.com/postwait/node-amqp)實現pub/sub模式。Node-amqp和socket.io奇怪的行爲

我有一些問題來實現它。

我需要什麼:

  • 由用戶發佈消息
  • 廣播給其他用戶
  • 發送到離線將他們將在下次連接時使用它的用戶信息

我親眼:

(function() { 

    var amqp = require('amqp'); 

    var connection = amqp.createConnection({ host: 'http://127.0.0.1:5672/' }); 
    var app = require('express')(); 
    var server = require('http').Server(app); 
    var io = require('socket.io')(server); 

    app.get('/', function (req, res) { 
     res.sendfile(__dirname + '/index.html'); 
    }); 

    server.listen(8888); 

// Wait for connection to become established. 


    connection.on('ready', function() { 

     var sendMessage = function (queue, msg) { 
      connection.publish(queue, JSON.stringify(msg)); 
     } 


     io.sockets.on('connection', function (socket) { 

      socket.on('message', function (msg) { 
       sendMessage('my-queue', msg); 
      }); 

      connection.queue('my-queue', {autoDelete: false}, function (q) { 
       q.bind('#'); 

       q.subscribe(function (message) { 
        socket.broadcast.emit('news',message); 
       }); 
      }); 

     }); 
    }); 
})() 
  • 在index.html頁面,我連接到插座服務器
  • 我有一個按鈕,發送消息
  • 我打開索引頁在兩個不同的瀏覽器,我的用戶都連接
  • 如果我發送消息到服務器,它將它發送給其他用戶
  • 如果我發送第二條消息到服務器,它將消息發送給發送該消息的用戶。

它的開關,每對消息(因爲我有兩個用戶),其他用戶得到的消息,如果它是一個IMPAIR消息,當前用戶發送消息接收消息。 這是什麼行爲?

你能幫我糾正我的代碼來實現我的需求一個好方法嗎?

注:我在Windows使用RabbitMQ的標準配置7 x64的計算機

編輯:我做了一個解決方案,每個消費者都能用得到的消息:

(function() { 

    var amqp = require('amqp'); 

    var connection = amqp.createConnection({ host: 'http://127.0.0.1:5672/' }); 
    var app = require('express')(); 
    var server = require('http').Server(app); 
    var io = require('socket.io')(server); 

    app.get('/', function (req, res) { 
     res.sendfile(__dirname + '/index.html'); 
    }); 

    server.listen(8888); 

// Wait for connection to become established. 


    connection.on('ready', function() { 


     connection.exchange('logs', {type: 'fanout', autoDelete: false}, function (exchange) { 

      var sendMessage = function (queue, msg) { 
       exchange.publish(queue, JSON.stringify(msg)); 
      } 

      io.sockets.on('connection', function (socket) { 

       socket.on('message', function (msg) { 
        sendMessage('', msg); 
       }); 

       connection.queue(socket.id, {exclusive: true}, function (q) { 
        q.bind('logs', ''); 

        q.subscribe(function (message) { 
         socket.emit('news', message); 
        }); 
       }); 

      }); 
     }); 
    }); 
})() 

我的最後一個問題是,我現在不能管理離線消息...任何解決方案? (明天賞金結束: - /)

回答

3

問題是RabbitMQ將故意將每條消息發送給單個用戶。該用戶確認收到該消息(amqp does this for you automatically),然後就RabbitMQ而言完成工作,因此它會刪除該消息。

您的用戶輪流接收消息的原因是RabbitMQ試圖將傳入消息均勻分散到用戶的負載上。

您的問題已在here之前得到解答。檢查出來解決您的問題!

2

您需要爲每個消費者(在您的案例 - 用戶中)創建單獨的隊列,並將消息從交換路由到所有消息隊列。這樣,當您發佈消息時,它將被放置在所有用戶的隊列中,每個隊列中的每個隊列都可以獨立使用它。

你有這樣的: http://www.rabbitmq.com/tutorials/tutorial-two-python.html

而你需要這樣的: http://www.rabbitmq.com/tutorials/tutorial-three-python.html

+0

謝謝您的幫助。現在我遇到了有關離線數據的問題......您能幫我嗎?我編輯了我的帖子 – mfrachet 2015-03-17 08:05:06