2012-01-13 32 views
1

我有一個問題,在另一個消費者收到消息之前,大約20個AMQP消息發佈到RabbitMQ中的共享隊列中時發生批量延遲。我的應用程序拓撲有三個端點:爲什麼我的AMQP郵件在發送前後滾動

  1. 第一個端點將生成一批消息,每個消息都包含網絡設備的FQDN以從中收集配置文件。這個端點將有一個實例,並且是一個名爲net.svc.ssh的直接隊列的唯一發布者。
  2. 第二個端點將使用這些消息之一,SSH到指定的網絡設備,運行CLI命令並將每個設備的結果發佈到另一個隊列。這個端點將有多個實例,它們都從net.svc.ssh中消耗併發布到net.svc.savefile。
  3. 第三個端點將接收到該結果並將其保存到本地磁盤中的各自單獨文件中。這個實例是net.svc.savefile的唯一使用者,並且不發佈消息。

我看到的是,所有的第二個端點將「持有」他們的傳出消息,直到沒有消息留在ssh隊列中消耗。這會阻止將設備配置及時傳送到最終端點。我所觀察到的是,一旦所有中間端點都消耗了他們的信息,每個配置都會同時進入。

所以我的問題是爲什麼我會看到這種假脫機行爲?我希望我的消息在準備就緒後發佈,並且此行爲不適用。

端點1 - 網絡設備的備份請求的發送方:

EventMachine.add_timer(0) do 
    YAML::load_file('hosts.yaml').each do |fqdn,ip| 
    payload = { :fqdn => fqdn } 
    exchange.publish(payload.to_json, :routing_key => 'net.svc.ssh') 
    end 
end 

端點2 - SSH節點

mq_queue.subscribe(:ack => false) do |meta, mq_payload| 
    payload = JSON.parse(mq_payload) 

    # SSH stuff happens here 

    mq_channel.default_exchange.publish(payload.to_json, 
             :routing_key => 'net.svc.savefile', 
             ) 
    end 
end 

端點3 - 節點,節省了設備配置到文件

queue.subscribe() do |meta, mq_payload| 
    payload = JSON.parse(mq_payload) 
    payload[ "host" ].each do |host,hash| 
    File.open(File.join('backups', host), "w") do |file| 
     result = hash[ "result" ] 
     if(result[ "error" ].nil?) 
     file << result[ "show running-config" ].join("\n") 
     else 
     file << result[ "error" ] 
     end 
    end 
    end 
end 

回答

1

在我看來,你正在向具有默認行爲的主題交換髮布消息。我假設你的消費者(有多個實例)訂閱了一個簡單綁定路由密鑰的隊列,即多個消費者將輪詢循環消息。

或者也許只有一個消費者。

我還沒有使用Ruby多年,所以我不熟悉各種庫的怪癖,但我看不到消費者在處理它們之後應答消息的位置。在消費者2上,你似乎已經明確地將auto-ack設置爲FALSE,這很好,而在消費者3中,你還沒有設置它,所以這可能意味着auto-ack正在發生。

當我寫一個消費者,關於另一個隊列公佈結果,我做事情的順序:

  1. 得到傳入消息
  2. 做的工作
  3. 發佈新的消息,結果隊列
  4. 確認收到的消息
相關問題