2015-04-21 33 views
1

我們在我們的項目中使用了rabbit MQ和Spring Integration。每條消息都有一個傳送模式,標題,屬性和有效載荷部分。 我們希望添加具有值2(任何整數)的優先級的屬性,具有「測試消息3」的有效載荷並將消息發佈到名爲OES的隊列。請參閱截圖。在RabbitMQ發佈消息時發送MessageProperties [priority = anyInteger]

Adding properties[priority=3] in rabbitmq message

如何在下面的出站信道適配器(Spring集成)添加messageproperties即)優先= 2(或任何值)。我知道我們可以通過添加到「映射請求標題」來添加「標題」,但我想添加屬性。在「出站通道適配器」中沒有爲MessageProperties定義的屬性。有沒有辦法解決這個問題。

我們對有效載荷沒有任何問題,它已經開始了。我們只想添加優先級= 2(任何值)的MessageProperties。如何在出站通道適配器中添加該功能(不需要硬編碼,應該是通用的)?

<!-- the mapped-request-headers should be symmetric with 
    the list on the consumer side defined in consumerbeans.consumerHeaderMapper() --> 
<int-amqp:outbound-channel-adapter id="publishingAmqpAdapter" 
    channel="producer-processed-event-channel" 
    amqp-template="amqpPublishingTemplate" 
    exchange-name="events_forwarding_exchange" 
    routing-key-expression="headers['routing-path']" 
    mapped-request-headers="X-CallerIdentity,routing-path,content-type,route_to*,event-type,compression-state,STANDARD_REQUEST_HEADERS" 
/> 

其他配置:

<!-- chain routes and transforms the ApplicationEvent into a json string --> 
<int:chain id="routingAndTransforming" 
    input-channel="producer-inbound-event-channel" 
    output-channel="producer-routed-event-channel"> 
    <int:transformer ref="outboundMessageTracker"/> 
    <int:transformer ref="messagePropertiesTransformer"/> 
    <int:transformer ref="eventRouter"/> 
    <int:transformer ref="eventToJsonTransformer"/> 
</int:chain> 

<int:transformer id="messagePayloadCompressor" 
    input-channel="compress-message-payload" 
    output-channel="producer-processed-event-channel" 
    ref="payloadCompressor"/> 

@Configuration("amqpProducerBeans") 
@ImportResource(value = "classpath:com/apple/store/platform/events/si/event-producer-flow.xml") 
public class AmqpProducerBeans { 

    @Bean(name = { "amqpPublishingTemplate" }) 
     public AmqpTemplate amqpTemplate() { 
      logger.debug("creating amqp publishing template"); 
      RabbitTemplate rabbitTemplate = new RabbitTemplate(producerConnectionFactory()); 
      SimpleMessageConverter converter = new SimpleMessageConverter(); 
      // following needed for retry logic 
      converter.setCreateMessageIds(true); 
      rabbitTemplate.setMessageConverter(converter); 
      return rabbitTemplate; 
     } 

/*Other code commented */ 

} 

其他代碼:

import org.springframework.integration.Message; 
import org.springframework.integration.annotation.Transformer; 
import org.springframework.integration.message.GenericMessage; 

public class PayloadCompressor { 

    @Transformer 
    public Message<byte[]> compress(Message<String> message){ 
     /* some code commented */ 

     Map<String, Object> headers = new HashMap<String, Object>(); 
     headers.putAll(message.getHeaders()); 
     headers.remove("compression-state"); 
     headers.put("compression-state", CompressionState.COMPRESSED); 
     Message<byte[]> compressedMessage = new GenericMessage<byte[]>(compressedPayload, headers); 
     return compressedMessage; 


    } 

如果我們不使用Spring集成,那麼我們就可以使用下面的channel.basicPublish方式併發送MessageProperties。

ConnectionFactory factory = new ConnectionFactory(); 
factory.setVirtualHost("/"); 
factory.setHost("10.102.175.30"); 
factory.setUsername("rahul"); 
factory.setPassword("rahul"); 
factory.setPort(5672); 
Connection connection = factory.newConnection(); 
System.out.println("got connection "+connection); 
Channel channel = connection.createChannel(); 
MessageProperties msgproperties= new MessageProperties() ; 
MessageProperties.BASIC.setPriority(3); 
// set Messageproperties with priority 
    String exchangeName = "HeaderExchange"; 
      String routingKey = "testkey"; 
      //routingkey 
      byte[] messageBodyBytes = "Message having priority value 3".getBytes(); 
      channel.basicPublish(exchangeName, 
                           routingKey, 
                           true, 
                           msgproperties.BASIC, 
                           messageBodyBytes); 

請讓我知道,如果你需要更多的細節。

回答

1

屬性已自動映射 - 請參閱header mapper

只需使用<header-enricher/>來設置適當的標題並將其映射到正確的屬性。在優先級的情況下,amqp專用標頭常量的常量爲here,請參閱here

+0

它的工作。加里羅素,你太棒了。非常感謝。 – Raghu