我想配置以下使用Spring Integration的結合反應:配置Spring集成聚合從RabbitMQ的扇出交換
- 將消息發送到一個通道。
- 請將此消息發佈到與n位消費者交換的兔子扇出(pub/sub)。
- 每個消費者提供一個響應消息。
- 在Spring集成將這些響應返回給原始客戶端之前,它們會對這些響應進行聚合。
我有這幾個問題至今...
我爲了設置
apply-sequence="true"
財產使用發佈 - 訂閱通道,使的correlationID,sequenceSize &的sequenceNumber屬性被設置。這些屬性正在被DefaultAmqpHeaderMapper
扔掉。DEBUG headerName=[correlationId] WILL NOT be mapped
即使在扇出交換中註冊了2個隊列,sequenceSize屬性也只設置爲1。這大概意味着這些消息會過早地從聚合器中釋放出來。我預計這是因爲我濫用發佈 - 訂閱 - 頻道以使用
apply-sequence="true"
,並且正確地說只有一個用戶int-amqp:outbound-gateway
。
我出站的Spring配置如下:
<int:publish-subscribe-channel id="output" apply-sequence="true"/>
<int:channel id="reply">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:aggregator input-channel="reply" method="combine">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
<int:logging-channel-adapter id="logger" level="INFO"/>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"/>
我RabbitMQ的配置如下:
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="a-queue"/>
<rabbit:queue name="b-queue"/>
<rabbit:fanout-exchange name="fanout-exchange">
<rabbit:bindings>
<rabbit:binding queue="a-queue" />
<rabbit:binding queue="b-queue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
消費者看起來是這樣的:
<int:channel id="input"/>
<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>
<bean id="listenerService" class="example.ListenerService"/>
<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>
任何我懷疑,建議會很好我有棍子某處錯誤的結束......根據Gary的意見
新的出站Spring配置:
<int:channel id="output"/>
<int:header-enricher input-channel="output" output-channel="output">
<int:correlation-id expression="headers['id']" />
</int:header-enricher>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"
mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>
<int:channel id="reply"/>
<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
感謝加里,這讓我更進一步,現在的問題是,我的出站網關似乎沒有等待消息的響應。消費者正在收到消息(來自扇出交換),我可以看到他們都回復了同一個rabbitmq隊列(來自DEBUG),但我沒有收到發件人的回覆。我不得不將amqp *添加到mapped-request-headers屬性中,否則標準amqp頭文件將丟失。 –
我已使用新配置更新了原始帖子。 –
我沒有注意到您使用的是網關 - 網關只處理對請求的單個回覆。您需要使用出站適配器來發送請求,並使用入站適配器來接收回復。您需要手動填充標題,以便2位消費者的入站網關知道如何回覆。 –