2011-07-19 51 views
8

我正在嘗試使用AMQP,Websockets和Ruby構建一個簡單的聊天應用程序。我明白,這可能不是理解AMQP的最佳用例,但我想知道我錯在哪裏。AMQP創建訂閱動態隊列

以下是我的AMQP的服務器代碼

require 'rubygems' 
require 'amqp' 
require 'mongo' 
require 'em-websocket' 
require 'json' 

class MessageParser 
    # message format => "room:harry_potter, nickname:siddharth, room:members" 
    def self.parse(message) 
    parsed_message = JSON.parse(message) 

    response = {} 
    if parsed_message['status'] == 'status' 
     response[:status] = 'STATUS' 
     response[:username] = parsed_message['username'] 
     response[:roomname] = parsed_message['roomname'] 
    elsif parsed_message['status'] == 'message' 
     response[:status] = 'MESSAGE' 
     response[:message] = parsed_message['message'] 
     response[:roomname] = parsed_message['roomname'].split().join('_') 
    end 

    response 
    end 
end 

class MongoManager 
    def self.establish_connection(database) 
    @db ||= Mongo::Connection.new('localhost', 27017).db(database) 
    @db.collection('rooms') 

    @db 
    end 
end 


@sockets = [] 
EventMachine.run do 
    connection = AMQP.connect(:host => '127.0.0.1') 
    channel = AMQP::Channel.new(connection) 

    puts "Connected to AMQP broker. #{AMQP::VERSION} " 

    mongo = MongoManager.establish_connection("trackertalk_development") 

    EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws| 
    socket_detail = {:socket => ws} 
    ws.onopen do 
     @sockets << socket_detail 

    end 

    ws.onmessage do |message| 

     status = MessageParser.parse(message)   
     exchange = channel.fanout(status[:roomname].split().join('_')) 

     if status[:status] == 'STATUS'    
     queue = channel.queue(status[:username], :durable => true) 

     unless queue.subscribed? 
     puts "--------- SUBSCRIBED --------------" 
     queue.bind(exchange).subscribe do |payload| 
      puts "PAYLOAD : #{payload}" 
      ws.send(payload) 
      end 
     else 
      puts "----ALREADY SUBSCRIBED" 
     end     

     # only after 0.8.0rc14 
     #queue = channel.queue(status[:username], :durable => true)  
     #AMQP::Consumer.new(channel, queue)   

     elsif status[:status] == 'MESSAGE' 
     puts "********************* Message- published ******************************" 
     exchange.publish(status[:message) 
     end     
    end 

    ws.onclose do 
     @sockets.delete ws 
    end 
    end  
end 

我使用狀態指示輸入消息是否是正在進行的聊天或需要我來處理家務事像訂閱隊列中的狀態消息的消息。

我所面臨的問題是,當我發送郵件一樣 socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))

exchange.publish' is called but it still doesn't get pushed via the ws.send`到瀏覽器。

我對EventMachine和AMQP的理解有沒有根本性的錯誤?

下面是相同的代碼pastie http://pastie.org/private/xosgb8tw1w5vuroa4w7a

我的代碼似乎希望在工作的時候取出durable => truequeue = channel.queue(status[:username], :durable => true)

以下是我的Rails的片段查看標識用戶的用戶名和房間名稱並通過Websockets將其作爲消息的一部分發送。

雖然代碼似乎工作,當我刪除durable => true我不明白爲什麼會影響被傳遞的消息。請忽略mongo的一部分,因爲它還沒有發揮任何作用。

我也想知道,如果我的方法來AMQP和它的使用是正確的

<script> 
    $(document).ready(function(){ 
     var username = '<%= @user.email %>'; 
     var roomname = 'Bazingaa'; 

     socket = new WebSocket('ws://127.0.0.1:8080/'); 

     socket.onopen = function(msg){ 
      console.log('connected'); 
      socket.send(JSON.stringify({status:'status', username:username, roomname:roomname})); 
     } 

     socket.onmessage = function(msg){ 
      $('#chat-log').append(msg.data); 

     } 

    }); 

</script> 
<div class='block'> 
    <div class='content'> 
    <h2 class='title'><%= @room.name %></h2> 
    <div class='inner'> 
     <div id="chat-log"> 
     </div> 

     <div id="chat-console"> 
     <textarea rows="5" cols="40"></textarea> 
     </div> 
    </div> 
    </div> 
</div> 

<style> 
    #chat-log{ 
     color:#000; 
     font-weight:bold; 
     margin-top:1em; 
     width:900px; 
     overflow:auto; 
     height:300px; 
    } 
    #chat-console{ 
     bottom:10px; 
    } 

    textarea{ 
     width:100%; 
     height:60px; 
    } 
</style> 
+0

如果我將amqp代碼作爲守護程序運行,是否有人可以告訴我如何在生產環境中組織我的代碼。任何能夠幫助我組織代碼的示例代碼都會有很大的幫助。 – Sid

回答

1

我認爲你的問題可能是隊列掛在ws.onmessage調用之間的代理上。當客戶端重新連接隊列並且綁定已經存在時,ws.send()不會被調用。

默認情況下,當您創建隊列時,該隊列及其所具有的任何綁定都將掛起,直到代理重新啓動,或者明確告訴代理將其刪除。

有兩種辦法可以改變:

  • 添加耐用標誌,當你創建隊列,這將導致隊列堅持圍繞即使代理重新啓動
  • 添加auto_delete標誌,這將導致經紀人在沒有消費者附加的短時間後自動刪除該實體

如果您可以控制使用rabbitmq代理的代理,則反省代理上發生的事情的簡單方法是安裝management plugin,該代理爲代理上的交換,綁定和隊列提供Web界面。

0

在第一次看AMQP位似乎是美好的,但我不想設置所有依賴。如果您只提供AMQP部分的最小示例,我會檢查它。

+0

在我的Rails代碼中,我標識用戶的電子郵件ID和Roomname,並通過Websockets將它傳遞給AMQP服務器代碼。 雖然代碼似乎工作,當我刪除持久=>真我不明白爲什麼會影響被傳遞的消息。請忽略mongo的一部分,因爲它還沒有發揮任何作用。 我也想知道我的AMQP方法及其用法是否正確 – Sid