2015-12-07 66 views
1

我正在使用rabbitmqamqp protocol在我的應用程序中進行聊天。我想獲得acknowledgement成功發送給發件人的消息。使用amqp從rabbitmq隊列收到的消息的確認

形成的通道

channel = AMQP.channel 

創建於RabbitMQ的一個隊列

channel.queue(receiver_id, :auto_delete => false, durable: true) 

通道扇出

sender_exchange = channel.fanout(sender_id+"exchange") 

渠道發佈

channel.publish(message) 

現在我想acknowledged的消息使用Ruby on Rails的接收發送器,讓我知道我使用哪種方法來獲得acknowledgement

回答

2

答案真的取決於「確認收到的消息」應該在您的應用程序中的含義。這裏有兩種可能性:

  1. 該消息已被經紀人
  2. 消息已收到一個或更多的消費者在代理

在你的聊天功能獲得,應該確認的意思選項1(消息是從你的客戶端到服務器)還是選項2(消息是從你的客戶端到另外一個或多個客戶端)?

Confirms (Publisher Acknowledgements)是RabbitMQ的一項功能,該功能擴展了AMQP以確認發佈者已收到經紀人的消息。這可能是選項1的最佳解決方案。它可能不是您想要的選項2,因爲它將確認即使該消息未被路由到任何消費者。請參閱「何時會確認消息?」部分在上面鏈接的文檔中。

選項2有點像電子郵件客戶端中的「已讀回執」。它將要求接收者在收到消息並顯示消息後發送收據消息。

要在rails中實現確認,假設您使用Bunny作爲您的AMQP客戶端,請遵循Publisher確認示例here。它可能是這個樣子:

# wherever I create my channel 
channel.confirm_select 

# publish my messages 

channel.wait_for_confirms 
0

如果你想在被消費者(它聽起來並不像你知道當它是由經紀人收到後收到消息知道;如果所以,@Ryan Hoegg的答案會爲你提供你所需要的),你需要做一個高度異步的變體RPC pattern

RPC通常是同步的:你發送一個命令(在這種情況下通過RabbitMQ隊列),並在運行完成後立即得到響應。在「閱讀回執」的情況下,該命令將來可能會執行很長時間。

如果你堅持使用RabbitMQ,下面是你如何在概念上實現讀 - 收據。這需要更多的代碼,因爲RabbitMQ不會自動支持/很長一段時間沒有原始語言,請看是否收到了某些東西。閱讀回執可以使用一個隊列(以及恆定的空間;每個聊天(對等對)沒有備份的讀取通知消息填充RabbitMQ)來實現。

一般模式

說愛麗絲希望從她的消息發送給鮑勃的讀收據。

  1. 給每個消息一個monotonically increasing數字標識符。第一條消息是#1,第二條消息是#2,依此類推。 RabbitMQ中的message_id字段非常適合此目的。 Alice應該用增加的ID手動「標記」每條消息。

實施例(愛麗絲):

message_number = message_number + 1 
sender_exchange.publish(message_body, :message_id => message_number) 
  • Alice的客戶磁道 「?具有鮑勃讀我的消息」通過存儲兩個數字:Alice發送的編號最大的消息和Bob讀取的最近消息。所以如果Alice的「閱讀狀態」是3,這意味着Alice已經確認Bob已經閱讀了消息1,2和3.當Alice的客戶端啓動時,它應該爲該對話聲明唯一命名的隊列alice_bob_read_receipts(或對應於receiver_id變量的東西;只要它是唯一的)。宣言應該是冪等的/無條件的。應該聲明該隊列的x-max-length爲1和x-overflowdrop-head(如果在老版本的RabbitMQ上,x-overflow可能不被支持,但默認行爲將是你想要的)。
  • 實施例(愛麗絲):

    read_receipt_queue = channel.queue(
        "#{receiver_id}_#{sender_id}_read_receipts", 
        :arguments => { 
        "x-max-length".to_sym => 1, 
        "x-overflow".to_sym => "drop-head" 
    }) 
    
  • 當Bob讀取的消息(例如,通過滾動到它或打開應用程序),Bob的客戶端應發佈的消息到帶有讀取消息ID的alice_bob_read_receipts隊列。 RabbitMQ元數據字段旨在用於這樣的用途,所以它可以用來存儲Bob讀取的消息ID。
  • 實施例(鮑勃):

    read_receipt_queue = channel.queue(
        "#{receiver_id}_#{sender_id}_read_receipts", 
        :arguments => { 
        "x-max-length".to_sym => 1, 
        "x-overflow".to_sym => "drop-head" 
    }) 
    exchange = read_receipt_queue.default_exchange 
    
    # Assume we've got a list of messages Bob has "read" somewhere 
    # and each item in that list is the exact same triple as received 
    # by the Queue#consume method's block: 
    # http://rubybunny.info/articles/queues.html#handling_messages_with_a_block 
    viewed_messages.each do |delivery_info, properties, payload| 
        exchange.publish("", :correlation_id => properties.message_id) 
    end 
    
  • 然後Alice的客戶端應訂閱讀收據隊列(因爲這可以在輪詢的基礎上進行,則如果你願意,可以跳過消費者邏輯,只需使用Queue#pop,但效率較低,可能會阻止你的程序)。如果消費者獲得任何數據,則愛麗絲應設置她的「鮑勃已經讀取了高達此數字的消息」,以抵消彈出消息correlation_id中收到的值,然後更新UI /讀取回執狀態。只有在收到的值大於Alice以前知道的已讀狀態(因爲郵件可能丟失或重新發送)時才應該執行此更新。
  • 實施例:

    read_receipt_queue.subscribe(...) do |delivery_info, properties, payload| 
        if properties.correlation_id > known_messages_bob_read 
        known_messages_bob_read = properties.correlation_id 
        end 
    end 
    

    可能的改進

    有改善 「讀回執」 圖案的可靠性和/或行爲的幾種方法:

    • 的示例我假定所有同行都可以將他們的最後發送消息和最後收到的收據號存儲在可靠的數據庫中。如果這是不可能的,則必須放棄單調計數器,並且每次對等體連接到會話時從一定數量開始。這工作正常;你的同事們必須意識到,他們可能會看到比他們以前意識到的更低的消息數量;您可以使用輔助客戶端會話標識檢測無序/重試消息傳遞與意外更低的號碼之間的區別,以及重新連接的客戶端以低號碼開始的區別。
    • 如果您的消息歷史記錄是短暫的(每次聊天后消失),或者您想成爲真的確定重新連接客戶端既不重複也不錯過消息或閱讀回執,您需要進行某種形式的握手協商會話首次啓動時的同伴。這樣,兩個對等端都可以得到一個保證唯一的共享會話ID。由於RabbitMQ位於所有這些中間,因此您無法使用現有的庫(例如SSL)用於此目的,因此您必須自行推出。如果你這樣做,不要認爲它是一個安全功能(除了它以外還使用真正的安全性),只是一個狀態同步功能。
    • 如果您的消息歷史記錄是而不是 transient(您的所有消息都位於某個可靠的數據庫中),它可能還不會使用RabbitMQ來同步讀取回執。如果您可以在數據庫中存儲每條消息或單元「最大讀取消息ID」的「讀取或不讀取」位,則客戶端可以輪詢該數據庫以更新其讀取收據。如果輪詢速度太慢或性能受到影響,您可以使用我的示例中提到的已讀回執隊列,但在客戶端重新連接時回退到數據庫。或者,只讀發送「輪詢讀取通知數據庫」位而不是計數器,可以使用讀取收件隊列來減少輪詢。
    相關問題