我能夠用兔的Java API的做寫Java程序如下:與兔AMQP的Spring集成「客戶端發送郵件 - >服務器接收與在返回隊列返回味精 - >客戶端得到相關消息」
客戶端發送消息在兔MQ交換/隊列相關性id(說UUID - 「348a07f5-8342-45ed-b40b-d44bfd9c4dde」)。
服務器接收該消息。
服務器通過具有相同關聯Id的Rabbit MQ交換機/隊列發送響應消息 - 「348a07f5-8342-45ed-b40b-d44bfd9c4dde」。
客戶端接收到的相關的消息僅在同一線程中爲1
下面是Send.java和Recv.java使用兔的API。我需要幫助來轉換此示例以使用Spring AMQP集成,尤其是在步驟4中接收部分。我正在尋找類似於可以使用關聯Id篩選消息的receive方法。
Send.java:
import java.util.UUID;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class Send {
private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
String message = "Hello World!";
String cslTransactionId = UUID.randomUUID().toString();
BasicProperties properties = (new BasicProperties.Builder())
.correlationId(cslTransactionId)
.replyTo(RESPONSE_QUEUE).build();
channel.basicPublish("", REQUEST_QUEUE, properties, message.getBytes());
System.out.println("Client Sent '" + message + "'");
Channel responseChannel = connection.createChannel();
responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
responseChannel.basicConsume(RESPONSE_QUEUE, false, consumer);
String correlationId = null;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String responseMessage = new String(delivery.getBody());
correlationId = delivery.getProperties().getCorrelationId();
System.out.println("Correlation Id:" + correlationId);
if (correlationId.equals(cslTransactionId)) {
System.out.println("Client Received '" + responseMessage + "'");
responseChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
break;
}
}
channel.close();
connection.close();
}
}
Recv.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Recv {
private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(REQUEST_QUEUE, true, consumer);
String correlationId = null;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
correlationId = delivery.getProperties().getCorrelationId();
System.out.println("Correlation Id:" + correlationId);
System.out.println("Server Received '" + message + "'");
if (correlationId != null)
break;
}
String responseMsg = "Response Message";
Channel responseChannel = connection.createChannel();
responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);
BasicProperties properties = (new BasicProperties.Builder())
.correlationId(correlationId).build();
channel.basicPublish("", RESPONSE_QUEUE, properties,responseMsg.getBytes());
System.out.println("Server Sent '" + responseMsg + "'");
channel.close();
connection.close();
}
}
運行由加里提供的Java配置之後,我想移動的配置,以XML格式的服務器端添加的偵聽。下面是XML配置:
server.xml中
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean
id="serviceListenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queues" ref="requestQueue"/>
<property name="messageListener" ref="messageListenerAdaptor"/>
</bean>
<bean id="messageListenerAdaptor"
class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<property name="delegate" ref="pojoListener" />
</bean>
<bean
id="pojoListener"
class="PojoListener"/>
<bean
id="replyListenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="queues" ref="replyQueue"/>
<property name="messageListener" ref="fixedReplyQRabbitTemplate"/>
</bean>
<!-- Infrastructure -->
<rabbit:connection-factory
id="connectionFactory"
host="localhost"
username="guest"
password="guest"
cache-mode="CHANNEL"
channel-cache-size="5"/>
<rabbit:template
id="fixedReplyQRabbitTemplate"
connection-factory="connectionFactory"
exchange="fdms.exchange"
routing-key="response.key"
reply-queue="RESPONSE.QUEUE">
<rabbit:reply-listener/>
</rabbit:template>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue id="requestQueue" name="REQUEST.QUEUE" />
<rabbit:queue id="replyQueue" name="RESPONSE.QUEUE" />
<rabbit:direct-exchange name="fdms.exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="RESPONSE.QUEUE" key="response.key" />
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
SpringReceive.java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringReceive {
/**
* @param args
*/
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("cslclient.xml");
SimpleMessageListenerContainer serviceListenerContainer = context.getBean("serviceListenerContainer", SimpleMessageListenerContainer.class);
serviceListenerContainer.start();
}
}
爲什麼''是不夠的,你:http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/amqp.html# AMQP的出站網關? –