2017-05-09 70 views
3

我想在RabbitMQ上使用頭交換,混合java和python組件,我需要確認交付。Rabbitmq頭交換和確認交付

我似乎從python(pika)和java客戶端獲得不同的行爲。

在蟒蛇:

channel.exchange_declare(exchange='headers_test', 
¦ ¦ ¦ ¦ ¦ ¦ ¦type='headers', 
¦ ¦ ¦ ¦ ¦ ¦ ¦durable=True) 
channel.confirm_delivery() 
result = channel.basic_publish(exchange='headers_test', 
¦ ¦ ¦ ¦ ¦ ¦ routing_key='', 
¦ ¦ ¦ ¦ ¦ ¦ mandatory=True, 
¦ ¦ ¦ ¦ ¦ ¦ body=message, 
¦ ¦ ¦ ¦ ¦ ¦ properties=pika.BasicProperties(
¦ ¦ ¦ ¦ ¦ ¦ ¦ delivery_mode=2, 
¦ ¦ ¦ ¦ ¦ ¦ ¦ headers=message_headers)) 

如果頭不匹配任何綁定的消費和無法路由的消息,結果是假

但在Java /斯卡拉:

channel.exchangeDeclare("headers_test", "headers", true, false, null) 
channel.confirmSelect 

val props = MessageProperties.PERSISTENT_BASIC.builder 
¦ ¦ ¦ ¦ .headers(messageHeaders).build 
channel.basicPublish("headers_test", 
¦ ¦ ¦ ¦ ¦ ¦"", //routingKey 
¦ ¦ ¦ ¦ ¦ ¦true, //mandatory 
¦ ¦ ¦ ¦ ¦ ¦props, 
¦ ¦ ¦ ¦ ¦ ¦"data".getBytes) 
channel.waitForConfirmsOrDie() 

在這裏,當messageHeaders找不到匹配時,消息似乎只是被丟棄而沒有錯誤

我錯過了什麼或兩個客戶的行爲真的不一樣嗎?我怎樣才能得到證實交付使用在Java中的頭交換?

注意:我已經有了一個「複雜」交換來排隊路由設置,我寧願避免將死信路由添加到遊戲中,而只是發送失敗。

回答

1

即使沒有與您的標題匹配的隊列,也會確認郵件被確認的問題。從文檔(https://www.rabbitmq.com/confirms.html):

對於不可路由的消息,一旦 交換驗證的消息不會路由到任何隊列(返回隊列空 列表)的經紀人會發出確認。如果該消息也作爲強制發佈,則在basic.ack之前將basic.return發送給客戶端。對於否定確認(basic.nack), 也是如此。

相反,您應該檢查basic.return消息以檢測郵件是否已被路由。

我用wireshark檢查過,並且實際上我可以看到,如果郵件沒有路由,那麼會出現AMQP basic.return郵件。

我supppose你應該

channel.addReturnListener(new ReturnListener() { 
    @Override 
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("App.handleReturn"); 
    System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]"); 
    } 
}); 

的確開始,如果消息尚未排到我得到這個:

replyCode = [312],replyText = [NO_ROUTE],交換= [headers_logs], routingKey = [],親....

此外,如果你想模仿Java中鼠兔的同步行爲似乎可以d通過在發佈消息並註冊確認偵聽器而不是依賴.waitForConfirmsOrDie()之前獲取當前發佈序號。

所以,一個完整的代碼示例將是:

channel.addReturnListener(new ReturnListener() { 
     @Override 
     public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 
     System.out.println("App.handleReturn"); 
     System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]"); 
     } 
    }); 

    channel.addConfirmListener(new ConfirmListener() { 
     @Override 
     public void handleAck(long deliveryTag, boolean multiple) throws IOException { 
     System.out.println("App.handleAck"); 
     System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]"); 
     } 

     @Override 
     public void handleNack(long deliveryTag, boolean multiple) throws IOException { 
     System.out.println("App.handleNack"); 
     System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]"); 
     } 
}); 

long nextPublishSeqNo = channel.getNextPublishSeqNo(); 
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo); 

channel.basicPublish("headers_logs", 
    "", 
    true, 
    props, 
    "data".getBytes()); 

而你需要找你發佈消息之前有一個頻道的發佈序列號返回/確認回調的內部。

如果你看看線路上發生了什麼,如果消息沒有被路由到任何隊列,RabbitMq發回一個basic.return消息,其中還包含確認(交付標籤)。如果消息已經被路由,RabbitMq發回一個單一的bacic.ack消息,其中也包含一個確認。

看來,RabbitMQ的Java客戶端總是調用basicReturn()回調basicConfirm(前),所以邏輯來判斷郵件是否已被路由與否可以這樣:

註冊回報,並確認在監聽器渠道; 記住一個頻道的下一個發佈序號; 等待退貨或確認回調。如果是回撥回傳 - 郵件尚未被路由,您應該忽略對同一個投放標籤的進一步確認。如果您在收到handleReturn()之前收到handleAck()回調,則表示消息已被路由到隊列。

雖然我不確定在哪種情況下可以調用.handleNack()。