2016-06-12 75 views
1

使用QPID Java客戶端我只能得到通過交換到綁定隊列傳遞的消息使用AMQAnyDestinationRabbitMQ的消息與QPID 0.32客戶端

Destination queue = new AMQAnyDestination(new AMQShortString("onms2"), 
               new AMQShortString("direct"), 
               new AMQShortString("Simon"), 
               true,   
               true,   
               new AMQShortString(""), 
               false,  
               bindvars); 

以下擴展的語法如果我嘗試使用不同的形式,只是指定的地址如下,它不工作: -

Destination queue = new AMQAnyDestination("onms2/Simon"); 

該消息命中RabbitMQ好但沒有交付。

Qpid 0.32客戶端 兔MQ 3.5.7

交易所ONMS 路由關鍵西蒙

我一直在使用qpid例子和修改ListSender例如如下

package org.apache.qpid.example; 

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.Message; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 

import org.apache.qpid.client.AMQAnyDestination; 
import org.apache.qpid.client.AMQConnection; 

import org.apache.qpid.framing.AMQShortString; 
import org.apache.qpid.jms.ListMessage; 


public class ListSender { 

public static void main(String[] args) throws Exception 
{ 
    Connection connection = 
     new AMQConnection("amqp://simon:[email protected]/test?brokerlist='tcp://localhost:5672'"); 
               AMQShortString a1 = new AMQShortString(""); 
               AMQShortString a2 = new AMQShortString(""); 
    AMQShortString[] bindvars = new AMQShortString[]{a1,a2}; 
    boolean is_durable = true; 
    /* 
    Destination queue = new AMQAnyDestination(new AMQShortString("onms2"), 
               new AMQShortString("direct"), 
               new AMQShortString("Simon"), 
               true,   
               true,   
               new AMQShortString(""), 
               false,  
               bindvars); 
    */ 

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    Destination queue = new AMQAnyDestination("onms2/Simon"); 
    //Destination queue = new AMQAnyDestination("amqp:OpenNMSExchange/Taylor; {create: always}"); 
    //Destination queue = new AMQAnyDestination("OpenNMSExchange; {create: always}"); 
    MessageProducer producer = session.createProducer(queue); 

ListMessage m = ((org.apache.qpid.jms.Session)session).createListMessage(); 
    m.setIntProperty("Id", 987654321); 
    m.setStringProperty("name", "WidgetSimon"); 
    m.setDoubleProperty("price", 0.99); 

    List<String> colors = new ArrayList<String>(); 
    colors.add("red"); 
    colors.add("green"); 
    colors.add("white"); 
    m.add(colors); 

    Map<String,Double> dimensions = new HashMap<String,Double>(); 
    dimensions.put("length",10.2); 
    dimensions.put("width",5.1); 
    dimensions.put("depth",2.0); 
    m.add(dimensions); 

    List<List<Integer>> parts = new ArrayList<List<Integer>>(); 
    parts.add(Arrays.asList(new Integer[] {1,2,5})); 
    parts.add(Arrays.asList(new Integer[] {8,2,5})); 
    m.add(parts); 

    Map<String,Object> specs = new HashMap<String,Object>(); 
    specs.put("colours", colors); 
    specs.put("dimensions", dimensions); 
    specs.put("parts", parts); 
    m.add(specs); 

    producer.send((Message)m); 
    System.out.println("Sent: " + m); 
    connection.close(); 
} 

} 

當使用AMQAnyDestination的擴展格式工作,調試日誌如下所示: -

163 [main] INFO org.apache.qpid.client.AMQConnection - Connection 1 now connected from /127.0.0.1:43298 to localhost/127.0.0.1:5672 
163 [main] DEBUG org.apache.qpid.client.AMQConnection - Are we connected:true 
163 [main] DEBUG org.apache.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-91 
166 [main] DEBUG org.apache.qpid.client.AMQDestination - Based on direct://onms2/Simon/?routingkey='Simon'&exclusive='true'&autodelete='true' the selected destination syntax is BURL 
169 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - Write channel open frame for channel id 1 
184 [main] DEBUG org.apache.qpid.client.AMQSession - Created session:[email protected] 
186 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (1028176102)Method frame received: [ChannelOpenOkBody] 
189 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (1028176102)Method frame received: [BasicQosOkBodyImpl: ] 
195 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - MessageProducer [email protected] using publish mode : ASYNC_PUBLISH_ALL 
206 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content body frames to direct://onms2/Simon/?routingkey='Simon'&exclusive='true'&autodelete='true' 
206 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content header frame to direct://onms2/Simon/?routingkey='Simon'&exclusive='true'&autodelete='true' 
207 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 67... 

當它無法使用更短的語法調試日誌看起來是這樣的: -

149 [main] INFO org.apache.qpid.client.AMQConnection - Connection 1 now connected from /127.0.0.1:36940 to localhost/127.0.0.1:5672 
149 [main] DEBUG org.apache.qpid.client.AMQConnection - Are we connected:true 
149 [main] DEBUG org.apache.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-91 
153 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - Write channel open frame for channel id 1 
169 [main] DEBUG org.apache.qpid.client.AMQSession - Created session:[email protected] 
170 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (472294496)Method frame received: [ChannelOpenOkBody] 
171 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (472294496)Method frame received: [BasicQosOkBodyImpl: ] 
179 [main] DEBUG org.apache.qpid.client.AMQDestination - Based on onms2/Simon the selected destination syntax is ADDR 
182 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - supportsIsBound: false 
182 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - supportsIsBound: false 
182 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - supportsIsBound: false 
184 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (472294496)Method frame received: [ExchangeDeclareOkBodyImpl: ] 
184 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - MessageProducer [email protected] using publish mode : ASYNC_PUBLISH_ALL 
195 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content body frames to 'onms2'/'Simon'; None 
195 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content header frame to 'onms2'/'Simon'; None 
196 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 90... 
196 [main] DEBUG org.apache.qpid.framing.FieldTable - {Id=[INT: 987654321], name=[LONG_STRING: WidgetSimon], price=[DOUBLE: 0.99], qpid.subject=[LONG_STRING: Simon], JMS_QPID_DESTTYPE=[INT: 2]} 
198 [main] DEBUG org.apache.qpid.client.AMQSession - Closing session: [email protected] 
198 [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolSession - closeSession called on protocol session for session 1 

理想情況下,我需要更短的語法來工作,因爲這是使用由我使用的是發佈信息的另一個應用程序使用AMQP。

我懷疑有一些不正確的語法我用來定義地址,但我不能看到它是什麼。

我曾嘗試: -

AMQP:onms2 /西蒙 地址:onms2 /西蒙

我已經證實了兔子的配置是通過使用兩個獨立的Java客戶端使用qpid測試,並同時使用正確的perl(使用net_amqp)和python(使用pika)。所以我不這麼認爲。

任何gudiance讚賞。

編輯: - 發現我已經錯過了 當我配置的地址如下它工作在QPID website一些額外的配置參數! onms3/Simon; { '創造': '總是', '節點':{ '類型': '主題'}} 詳細

<name> [/<subject> ] ; { 
create: always | sender | receiver | never, 
delete: always | sender | receiver | never, 
assert: always | sender | receiver | never, 
mode: browse | consume, 
node: { 
type: queue | topic, 
durable: True | False, 
x-declare: { ... <declare-overrides> ... }, 
x-bindings: [<binding_1>, ... <binding_n>] 
}, 
link: { 
name: <link-name>, 
durable: True | False, 
reliability: unreliable | at-most-once | at-least-once | exactly-once, 
x-declare: { ... <declare-overrides> ... }, 
x-bindings: [<binding_1>, ... <binding_n>], 
x-subscribe: { ... <subscribe-overrides> ... } 
} 
} 

西蒙

回答

1

編輯: - 發現了一些額外的配置上QPID website參數我錯過了 當我配置地址如下它的作品!

onms3/Simon; {'create':'always','node':{'type':'topic'} } 

詳細

<name> [/<subject> ] ; { 
create: always | sender | receiver | never, 
delete: always | sender | receiver | never, 
assert: always | sender | receiver | never, 
mode: browse | consume, 
node: { 
type: queue | topic, 
durable: True | False, 
x-declare: { ... <declare-overrides> ... }, 
x-bindings: [<binding_1>, ... <binding_n>] 
}, 
link: { 
name: <link-name>, 
durable: True | False, 
reliability: unreliable | at-most-once | at-least-once | exactly-once, 
x-declare: { ... <declare-overrides> ... }, 
x-bindings: [<binding_1>, ... <binding_n>], 
x-subscribe: { ... <subscribe-overrides> ... } 
} 
} 
+0

我編輯的答案我自己的問題。不確定你的意思 - 請詳細說明。 –

+0

就是這個想法。 – paisanco

+0

np :-)感謝您的指導 –