0
對於需求之一,我們需要跟蹤隊列深度併成功處理消息。這個想法是發佈消息並獲得成功和失敗消息的列表。爲了模擬要求我做了以下RabbitMQ:setReturnListner handleBasicReturn nt被調用未傳送的消息
- 發佈具有強制性和立即標誌的消息中發送channel.basicPublish「交流」,「RKEY」,真,假的道具,「Hello World」的.bytes
- 的消費者消費甚至標記(我已經把1..10中的數字作爲每個消息的標題中的標記值)並且不應答奇數編號的消息。
- 我已經在發佈者中實現了setReturnListnere來捕獲未傳遞的消息。
雖然我能夠通過messages_unacknowledged Rabbmitmqctl list_queues得到未確認的消息的數量,不知我的handleBasicReturn方法不會被調用。我錯過了一些東西。
代碼片段:
監製:
channel.setReturnListener(new ReturnListener() {
public void handleBasicReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties,
byte[] body)
throws IOException {
println "Debugging messages!!!!"
println "The details of returned messages are ${replyText} from ${exchange} with routingKey as ${routingKey} with properties"
}
});
println " queuename is ${dec.queue} and consumerCount is ${dec.consumerCount} messageCount is ${dec.messageCount}"
(1..10).each {
println "Sending file ${i}....."
def headers = new HashMap<String,Object>()
headers.put "operatiion","scp"
headers.put "dest","joker.dk.mach.com"
headers.put "id", i
println headers
BasicProperties props = new BasicProperties(null, null, headers, null, null, null, null, null,null, null, null, null,null, null)
channel.basicPublish 'exchange' ,'rKey',true,false, props,"Hello Worls".bytes
i++
}
channel.close()
消費者:
while (true) {
def delivery = consumer.nextDelivery()
def headers = delivery?.properties?.headers
def id = headers.get("id")
println "Received message:"
println " ${id.toString()}"
if(id % 2 == 0){
channel.basicAck delivery.envelope.deliveryTag, false
}
}