我有一個問題,在另一個消費者收到消息之前,大約20個AMQP消息發佈到RabbitMQ中的共享隊列中時發生批量延遲。我的應用程序拓撲有三個端點:爲什麼我的AMQP郵件在發送前後滾動
- 第一個端點將生成一批消息,每個消息都包含網絡設備的FQDN以從中收集配置文件。這個端點將有一個實例,並且是一個名爲net.svc.ssh的直接隊列的唯一發布者。
- 第二個端點將使用這些消息之一,SSH到指定的網絡設備,運行CLI命令並將每個設備的結果發佈到另一個隊列。這個端點將有多個實例,它們都從net.svc.ssh中消耗併發布到net.svc.savefile。
- 第三個端點將接收到該結果並將其保存到本地磁盤中的各自單獨文件中。這個實例是net.svc.savefile的唯一使用者,並且不發佈消息。
我看到的是,所有的第二個端點將「持有」他們的傳出消息,直到沒有消息留在ssh隊列中消耗。這會阻止將設備配置及時傳送到最終端點。我所觀察到的是,一旦所有中間端點都消耗了他們的信息,每個配置都會同時進入。
所以我的問題是爲什麼我會看到這種假脫機行爲?我希望我的消息在準備就緒後發佈,並且此行爲不適用。
端點1 - 網絡設備的備份請求的發送方:
EventMachine.add_timer(0) do
YAML::load_file('hosts.yaml').each do |fqdn,ip|
payload = { :fqdn => fqdn }
exchange.publish(payload.to_json, :routing_key => 'net.svc.ssh')
end
end
端點2 - SSH節點
mq_queue.subscribe(:ack => false) do |meta, mq_payload|
payload = JSON.parse(mq_payload)
# SSH stuff happens here
mq_channel.default_exchange.publish(payload.to_json,
:routing_key => 'net.svc.savefile',
)
end
end
端點3 - 節點,節省了設備配置到文件
queue.subscribe() do |meta, mq_payload|
payload = JSON.parse(mq_payload)
payload[ "host" ].each do |host,hash|
File.open(File.join('backups', host), "w") do |file|
result = hash[ "result" ]
if(result[ "error" ].nil?)
file << result[ "show running-config" ].join("\n")
else
file << result[ "error" ]
end
end
end
end