2014-05-05 52 views
1

我能夠用兔的Java API的做寫Java程序如下:與兔AMQP的Spring集成「客戶端發送郵件 - >服務器接收與在返回隊列返回味精 - >客戶端得到相關消息」

  1. 客戶端發送消息在兔MQ交換/隊列相關性id(說UUID - 「348a07f5-8342-45ed-b40b-d44bfd9c4dde」)。

  2. 服務器接收該消息。

  3. 服務器通過具有相同關聯Id的Rabbit MQ交換機/隊列發送響應消息 - 「348a07f5-8342-45ed-b40b-d44bfd9c4dde」。

  4. 客戶端接收到的相關的消息僅在同一線程中爲1

下面是Send.java和Recv.java使用兔的API。我需要幫助來轉換此示例以使用Spring AMQP集成,尤其是在步驟4中接收部分。我正在尋找類似於可以使用關聯Id篩選消息的receive方法。

Send.java:

import java.util.UUID; 

import com.rabbitmq.client.AMQP.BasicProperties; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.QueueingConsumer; 

public class Send { 

    private final static String REQUEST_QUEUE = "REQUEST.QUEUE"; 
    private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE"; 

    public static void main(String[] argv) throws Exception { 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(REQUEST_QUEUE, false, false, false, null); 
     String message = "Hello World!"; 
     String cslTransactionId = UUID.randomUUID().toString(); 
     BasicProperties properties = (new BasicProperties.Builder()) 
      .correlationId(cslTransactionId) 
      .replyTo(RESPONSE_QUEUE).build(); 

     channel.basicPublish("", REQUEST_QUEUE, properties, message.getBytes()); 

     System.out.println("Client Sent '" + message + "'"); 


     Channel responseChannel = connection.createChannel(); 
     responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null); 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     responseChannel.basicConsume(RESPONSE_QUEUE, false, consumer); 
     String correlationId = null; 
     while (true) { 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String responseMessage = new String(delivery.getBody()); 
      correlationId = delivery.getProperties().getCorrelationId(); 
      System.out.println("Correlation Id:" + correlationId); 
      if (correlationId.equals(cslTransactionId)) { 
        System.out.println("Client Received '" + responseMessage + "'"); 
       responseChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true); 
       break; 
      } 
     } 

     channel.close(); 
     connection.close(); 
    } 
} 

Recv.java

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 
import com.rabbitmq.client.AMQP.BasicProperties; 

public class Recv { 

    private final static String REQUEST_QUEUE = "REQUEST.QUEUE"; 
    private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE"; 

    public static void main(String[] argv) throws Exception { 

     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(REQUEST_QUEUE, false, false, false, null); 
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(REQUEST_QUEUE, true, consumer); 
     String correlationId = null; 
     while (true) { 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      correlationId = delivery.getProperties().getCorrelationId(); 
      System.out.println("Correlation Id:" + correlationId); 
      System.out.println("Server Received '" + message + "'"); 
      if (correlationId != null) 
       break; 
      } 

      String responseMsg = "Response Message"; 
      Channel responseChannel = connection.createChannel(); 
      responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null); 
      BasicProperties properties = (new BasicProperties.Builder()) 
      .correlationId(correlationId).build(); 

      channel.basicPublish("", RESPONSE_QUEUE, properties,responseMsg.getBytes()); 

      System.out.println("Server Sent '" + responseMsg + "'"); 

      channel.close(); 
      connection.close(); 
     } 
} 

運行由加里提供的Java配置之後,我想移動的配置,以XML格式的服務器端添加的偵聽。下面是XML配置:

server.xml中

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream" 
    xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd 
     http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
     http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd 
     http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd 
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <bean 
     id="serviceListenerContainer" 
     class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> 
      <property name="connectionFactory" ref="connectionFactory" /> 
      <property name="queues" ref="requestQueue"/> 
      <property name="messageListener" ref="messageListenerAdaptor"/> 
    </bean> 

    <bean id="messageListenerAdaptor" 
     class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter"> 
     <property name="delegate" ref="pojoListener" /> 
    </bean> 

    <bean 
     id="pojoListener" 
     class="PojoListener"/> 

    <bean 
     id="replyListenerContainer" 
     class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"> 
     <property name="connectionFactory" ref="connectionFactory"/> 
     <property name="queues" ref="replyQueue"/> 
     <property name="messageListener" ref="fixedReplyQRabbitTemplate"/> 
    </bean> 

    <!-- Infrastructure --> 
    <rabbit:connection-factory 
     id="connectionFactory" 
     host="localhost" 
     username="guest" 
     password="guest" 
     cache-mode="CHANNEL" 
     channel-cache-size="5"/> 

    <rabbit:template 
     id="fixedReplyQRabbitTemplate" 
     connection-factory="connectionFactory" 
     exchange="fdms.exchange" 
     routing-key="response.key" 
     reply-queue="RESPONSE.QUEUE"> 
     <rabbit:reply-listener/> 
    </rabbit:template> 

    <rabbit:admin connection-factory="connectionFactory"/> 

    <rabbit:queue id="requestQueue" name="REQUEST.QUEUE" /> 
    <rabbit:queue id="replyQueue" name="RESPONSE.QUEUE" /> 

    <rabbit:direct-exchange name="fdms.exchange" durable="true" auto-delete="false"> 
     <rabbit:bindings> 
      <rabbit:binding queue="RESPONSE.QUEUE" key="response.key" /> 
     </rabbit:bindings> 
    </rabbit:direct-exchange> 
</beans> 

SpringReceive.java

import org.springframework.amqp.core.Message; 
import org.springframework.amqp.core.MessageBuilder; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 
import org.springframework.context.support.ClassPathXmlApplicationContext; 


public class SpringReceive { 

/** 
* @param args 
*/ 
public static void main(String[] args) { 
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("cslclient.xml"); 
    SimpleMessageListenerContainer serviceListenerContainer =  context.getBean("serviceListenerContainer", SimpleMessageListenerContainer.class); 
    serviceListenerContainer.start(); 
    } 
} 
+0

爲什麼''是不夠的,你:http://docs.spring.io/spring-integration/docs/latest-ga/reference/html/amqp.html# AMQP的出站網關? –

回答

2

可以使用RabbitTemplate.sendAndReceive()(或convertSendAndReceive())與答覆聽衆容器(Docs here);該模板將爲您處理相關性。

如果你正在使用Spring集成,使用出站網關與適當配置的兔子模板。

+0

能否請你點我簡單的例子,其中客戶端發送請求並等待使用RabbitTemplate.sendAndReceive()或convertSendAndReceive()相關的響應。我打算通過Rabbit Queue發送一個序列化的對象。 – GRaj

+0

文檔我指出你有一個鏈接到一個「具有固定應答隊列有線一個RabbitTemplate的一個完整的例子,具有共同‘遠程’,處理該請求並返回的答覆是示出在本試驗的情況下監聽器容器」(HTTPS ://github.com/spring-projects/spring-amqp/blob/master/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/JavaConfigFixedReplyQueueTests.java)。它使用'correlationId'頭,並且可以與'Serializable'有效載荷一起工作。 (測試用例還包括響應方)。 –

+0

謝謝加里。 Junit工作沒有任何問題。現在我正試圖將Java配置轉換爲XML。從Server監聽器配置開始。 – GRaj

相關問題