0

我試圖使用Spring與RabbitMQ集成,使用RabbitMQ支持的Spring集成通道。 (由於某種原因,似乎幾乎沒有記錄,這是新的?)。爲什麼Jackson2JsonMessageConverter的AmqpChannelFactoryBean不存儲類型?

要做到這一點,似乎我可以使用AmqpChannelFactoryBean來創建一個頻道。 爲了建立消息轉換,我使用了Jackson2JsonMessageConverter。

當我使用帶有POJO負載的GenericMessage時,它拒絕從Java反序列化它,這基本上是因爲它不知道類型。我本來希望這個類型可以自動放在標題上,但在標題上只有__TypeId__=org.springframework.messaging.support.GenericMessage

在春季啓動我的配置類看起來是這樣的:

@Configuration 
public class IntegrationConfiguration { 

    @Bean 
    public MessageConverter messageConverter() { 
     return new Jackson2JsonMessageConverter(); 
    } 

    @Bean 
    public AmqpChannelFactoryBean myActivateOutChannel(CachingConnectionFactory connectionFactory, 
     MessageConverter messageConverter) { 

     AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true); 
     factoryBean.setConnectionFactory(connectionFactory); 
     factoryBean.setQueueName("myActivateOut"); 
     factoryBean.setPubSub(false); 
     factoryBean.setAcknowledgeMode(AcknowledgeMode.AUTO); 
     factoryBean.setDefaultDeliveryMode(MessageDeliveryMode.PERSISTENT); 
     factoryBean.setMessageConverter(messageConverter); 
     return factoryBean; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "bsnkActivateOutChannel", autoStartup="true") 
    public MessageHandler mqttOutbound() { 

     return m -> System.out.println(m); 
    } 

} 

發送像這樣做:

private final MessageChannel myActivateOutChannel; 

@Autowired 
public MySender(MessageChannel myActivateOutChannel) { 
    this.myActivateOutChannel = myActivateOutChannel; 
} 

@Override 
public void run(ApplicationArguments args) throws Exception { 
    MyPojo pojo = new MyPojo(); 
    Message<MyPojo> msg = new GenericMessage<>(pojo); 

    myActivateOutChannel.send(msg); 
} 

如果我有了自己的classmapper,事情做的工作,因爲他們應該。但是如果我設置這樣的東西,我將不得不使用許多MessageConverters。 例如

converter.setClassMapper(new ClassMapper() { 

     @Override 
     public void fromClass(Class<?> clazz, MessageProperties properties) { 
     } 

     @Override 
     public Class<?> toClass(MessageProperties properties) { 
      return MyPojo.class; 
     } 

    }); 

我使用這個錯誤嗎?我是否缺少一些配置?還有其他建議嗎?

謝謝! :)

注意:更多的東西,我猜'春季整合'的方式是在每一邊添加一個Spring集成JSON變換器,這意味着每個RabbitMQ隊列還增加兩個額外的直接通道? 這對我來說是錯誤的,因爲我已經有了三倍的通道(6!for進/出),但是mayby可以這麼說,框架應該如何使用?將所有簡單的步驟與直接渠道結合起來? (在這種情況下,我是否保留了RabbitMQ渠道提供的持久性?或者如果我需要,我是否需要一些交易機制?或者它是固有的,直接渠道是如何工作的?)

我也注意到, Spring集成MessageConverter和Spring-amqp MessageConverter。後者是我用過的。其他人會按照我想要的方式工作嗎?快速瀏覽代碼表明它不會將對象類型存儲在消息頭中?

回答

1

在版本4.3之前,amqp支持的通道僅支持可序列化的有效負載;解決方法是使用通道適配器(支持映射)。

INT-3975引入了一個新屬性extractPayload,它導致消息頭被映射到rabbitmq頭並且消息體只是有效載荷而不是序列化的GenericMessage

extractPayload設置爲true應該可以解決您的問題。

+0

乾杯!我明天會試試:) – cranphin

+0

是的,這個作品完美無缺!我會接受這個答案:)在我可以找到這個手冊的任何手冊中,是否有任何明顯的地方?不要抱怨!但是這對我來說太深入瞭解了:) – cranphin

+0

查看[AMQP支持的消息頻道]中的最後一段(https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-頻道)就在**重要**備註之上。 –

相關問題