2013-05-29 87 views
3

要求開發mule流程,它並行調用3個不同的同步服務,然後彙總每個響應並將其發回給調用者。Mule中的並行處理:獲取正確響應的問題

我已按照文檔和How to make parallel outbound calls中提到的方法遵循叉聯接方法。 我的配置文件看起來像如下:

  <flow name="fork"> 
      <http:inbound-endpoint host="localhost" port="8090" path="mainPath" exchange-pattern="request-response"> 
      <set-property propertyName="MULE_CORRELATION_GROUP_SIZE" 
       value="2" /> 
      <all enableCorrelation="IF_NOT_SET"> 
       <async> 
        <set-property propertyName="MULE_CORRELATION_SEQUENCE" 
         value="1" /> 
        <flow-ref name="parallel1" /> 
       </async> 
       <async> 
        <set-property propertyName="MULE_CORRELATION_SEQUENCE" 
         value="2" /> 
        <flow-ref name="parallel2" /> 
       </async> 
      </all> 
     </flow> 

     <sub-flow name="parallel1"> 
      <logger level="INFO" message="parallel1: processing started" /> 
      <!- Transformation payload --> 
      <http:outbound-endpoint address="..." 
       exchange-pattern="request-response" /> 
      <logger level="INFO" message="parallel1: processing finished" /> 
      <flow-ref name="join" /> 
     </sub-flow> 

     <sub-flow name="parallel2"> 
      <logger level="INFO" message="parallel2: processing started" /> 
      <!- Transformation payload --> 
      <http:outbound-endpoint address="..." 
       exchange-pattern="request-response" /> 
      <logger level="INFO" message="parallel2: processing finished" /> 
      <flow-ref name="join" /> 
     </sub-flow> 

     <sub-flow name="join"> 
      <collection-aggregator timeout="6000" 
       failOnTimeout="true" /> 
      <combine-collections-transformer /> 
      <logger level="INFO" message="Continuing processing of: #[message.payloadAs(java.lang.String)]" /> 
      <set-payload value="Soap XML Response"/> 
     </sub-flow> 

我能夠確認,直到「加入」子流工作正常,但響應不回來爲「肥皂XML響應」。 響應是相同的初始SOAP請求。

我該如何讓這個線程等待子流處理完成,並且無論「join」子流返回什麼都返回響應。

回答

5

上面的帖子中的fork連接看起來不錯。這裏的問題是在連接之後無法捕獲有效載荷並將其帶回主流。

由於對並行異步進行的調用,主流繼續而不等待連接輸出。

我修改了流程來解決此問題。現在流將有一個處理器等待回覆,並讀取加入的輸出寫入http變換器。

<flow name="fork"> 
     <http:inbound-endpoint host="localhost" port="8090" path="mainPath" exchange-pattern="request-response"> 
      <!-- To get back the response after the fork-join --> 
     <request-reply timeout="60000"> 
      <jms:outbound-endpoint queue="parallel.processor.queue"> 
       <message-properties-transformer scope="outbound"> 
        <delete-message-property key="MULE_REPLYTO" /> 
       </message-properties-transformer> 
      </jms:outbound-endpoint> 
      <jms:inbound-endpoint queue="join.queue" >  
      </jms:inbound-endpoint> 
     </request-reply>    
    </flow> 

    <flow name="fork_join_flow" > 
     <jms:inbound-endpoint queue="parallel.processor.queue" exchange-pattern="one-way" />   
     <set-property propertyName="MULE_CORRELATION_GROUP_SIZE" 
       value="2" /> 
     <all enableCorrelation="IF_NOT_SET"> 
      <async> 
       <set-property propertyName="MULE_CORRELATION_SEQUENCE" 
        value="1" /> 
       <flow-ref name="parallel1" /> 
      </async> 
      <async> 
       <set-property propertyName="MULE_CORRELATION_SEQUENCE" 
        value="2" /> 
       <flow-ref name="parallel2" /> 
      </async> 
     </all> 
    </flow> 

    <sub-flow name="parallel1"> 
     <logger level="INFO" message="parallel1: processing started" /> 
     <!- Transformation payload --> 
     <http:outbound-endpoint address="..." 
      exchange-pattern="request-response" /> 
     <logger level="INFO" message="parallel1: processing finished" /> 
     <flow-ref name="join" /> 
    </sub-flow> 

    <sub-flow name="parallel2"> 
     <logger level="INFO" message="parallel2: processing started" /> 
     <!- Transformation payload --> 
     <http:outbound-endpoint address="..." 
      exchange-pattern="request-response" /> 
     <logger level="INFO" message="parallel2: processing finished" /> 
     <flow-ref name="join" /> 
    </sub-flow> 

    <sub-flow name="join"> 
     <collection-aggregator timeout="6000" 
      failOnTimeout="true" /> 
     <combine-collections-transformer /> 
     <logger level="INFO" message="Continuing processing of: #[message.payloadAs(java.lang.String)]" /> 
     <set-payload value="Soap XML Response"/> 
     <jms:outbound-endpoint queue="join.queue">    
     </jms:outbound-endpoint> 
    </sub-flow> 

希望這會有所幫助。

+0

非常感謝。它運行良好。但是,即使我對Web服務進行同步調用,是否也必須創建jms連接器和隊列?有沒有其他模式可以使用?@ user1760178 –

+0

從我的knolwedge那是我唯一的方法。 – user1760178

+1

如您所說,如果您需要使用JMS,請僅僅提供一個附註。您可以使用VM傳輸而不是JMS,具體取決於您需要的功能。基本的比較可以在這裏找到:http://ricric.com/blog/?p=302 –

相關問題