2012-10-29 34 views
4

我想配置以下使用Spring Integration的結合反應:配置Spring集成聚合從RabbitMQ的扇出交換

  1. 將消息發送到一個通道。
  2. 請將此消息發佈到與n位消費者交換的兔子扇出(pub/sub)。
  3. 每個消費者提供一個響應消息。
  4. 在Spring集成將這些響應返回給原始客戶端之前,它們會對這些響應進行聚合。

我有這幾個問題至今...

  1. 我爲了設置apply-sequence="true"財產使用發佈 - 訂閱通道,使的correlationID,sequenceSize &的sequenceNumber屬性被設置。這些屬性正在被DefaultAmqpHeaderMapper扔掉。 DEBUG headerName=[correlationId] WILL NOT be mapped

  2. 即使在扇出交換中註冊了2個隊列,sequenceSize屬性也只設置爲1。這大概意味着這些消息會過早地從聚合器中釋放出來。我預計這是因爲我濫用發佈 - 訂閱 - 頻道以使用apply-sequence="true",並且正確地說只有一個用戶int-amqp:outbound-gateway

我出站的Spring配置如下:

<int:publish-subscribe-channel id="output" apply-sequence="true"/> 

<int:channel id="reply"> 
    <int:interceptors> 
     <int:wire-tap channel="logger"/> 
    </int:interceptors> 
</int:channel> 

<int:aggregator input-channel="reply" method="combine"> 
    <bean class="example.SimpleAggregator"/> 
</int:aggregator> 

<int:logging-channel-adapter id="logger" level="INFO"/> 

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/> 

<int-amqp:outbound-gateway request-channel="output" 
            amqp-template="amqpTemplate" exchange-name="fanout-exchange" 
            reply-channel="reply"/> 

我RabbitMQ的配置如下:

<rabbit:connection-factory id="connectionFactory" /> 

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" /> 

<rabbit:admin connection-factory="connectionFactory" /> 

<rabbit:queue name="a-queue"/> 
<rabbit:queue name="b-queue"/> 

<rabbit:fanout-exchange name="fanout-exchange"> 
    <rabbit:bindings> 
     <rabbit:binding queue="a-queue" /> 
     <rabbit:binding queue="b-queue" /> 
    </rabbit:bindings> 
</rabbit:fanout-exchange> 

消費者看起來是這樣的:

<int:channel id="input"/> 

<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/> 

<bean id="listenerService" class="example.ListenerService"/> 

<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/> 

任何我懷疑,建議會很好我有棍子某處錯誤的結束......根據Gary的意見

新的出站Spring配置:

<int:channel id="output"/> 

<int:header-enricher input-channel="output" output-channel="output"> 
    <int:correlation-id expression="headers['id']" /> 
</int:header-enricher> 

<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" /> 

<int-amqp:outbound-gateway request-channel="output" 
            amqp-template="amqpTemplate" exchange-name="fanout-exchange" 
            reply-channel="reply" 
            mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/> 

<int:channel id="reply"/> 

<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2"> 
    <bean class="example.SimpleAggregator"/> 
</int:aggregator> 

回答

3

的問題是,法定文書不知道扇出交換的拓撲結構。

解決這個問題的最簡單方法是使用自定義的發佈策略

release-strategy-expression="size() == 2" 
的聚合

(假設2扇出)。所以,你不需要序列大小;你能避免「濫用」的發佈/訂閱一個頭,富集通道...

<int:header-enricher input-channel="foo" output-channel="bar"> 
     <int:correlation-id expression="T(java.util.UUID).randomUUID().toString()" /> 
    </int:header-enricher> 

你能避免使用消息ID,這已經是唯一的創建一個新的UUID ...

<int:correlation-id expression="headers['id']" /> 

最後,您可以通過添加

mapped-request-headers="correlationId" 

您AMQP端點通過的correlationID頭AMQP。

+0

感謝加里,這讓我更進一步,現在的問題是,我的出站網關似乎沒有等待消息的響應。消費者正在收到消息(來自扇出交換),我可以看到他們都回復了同一個rabbitmq隊列(來自DEBUG),但我沒有收到發件人的回覆。我不得不將amqp *添加到mapped-request-headers屬性中,否則標準amqp頭文件將丟失。 –

+0

我已使用新配置更新了原始帖子。 –

+0

我沒有注意到您使用的是網關 - 網關只處理對請求的單個回覆。您需要使用出站適配器來發送請求,並使用入站適配器來接收回復。您需要手動填充標題,以便2位消費者的入站網關知道如何回覆。 –

0

即使這個問題是3歲,我會迴應它,因爲我有同樣的問題。

Spring Integration具有Scatter-Gather的實現,聽起來非常像您的原始問題。

下面是相關的部分Spring Documentation

這是一個複合終點,其目的是將消息發送到收件人 和彙總結果....

此前,該模式可以使用分立元件配置,這種增強帶來更方便的配置。

的ScatterGatherHandler是請求 - 應答端點,結合了 PublishSubscribeChannel(或RecipientListRouter)和 AggregatingMessageHandler。請求消息發送到分散的通道,ScatterGatherHandler等待來自聚合器的回覆發送到outputChannel。