2015-05-18 37 views
1

我試圖以持久的方式處理Mule中的XML對象隊列,這些對象已經從原始xml文件中分離出來,然後使用「選擇」組件進行路由。 選擇組件的每個支路導致帶有不同隊列的AMQP端點。每個隊列的另一端是應該讀取隊列的另一個Mule流,對XML執行一些操作並將其作爲回覆返回。所有的AMQP端點都被設置爲請求響應。Mule中的同步AMQP連接器的問題

該流程似乎正常工作,直到它將某些內容放到AMQP隊列中,但之後只是立即繼續,而不是等待消息。

這也是由遠程流程產生的,它似乎正確地讀取隊列,但似乎立即回覆,然後繼續正確處理它。最後,它應該回復該消息,但似乎沒有這樣做。

這裏有一些代碼段的情況下,任何人都可以指出我要去哪裏錯了...

主路由器流量

<amqp:connector name="connector.amqp.mule.default" doc:name="AMQP Connector" validateConnections="true"/> 
    <flow name="routerFlow"> 
    <http:listener config-ref="HTTP_Listener_Configuration" path="/configtest" doc:name="HTTP" allowedMethods="POST" /> 
    <mulexml:dom-to-xml-transformer doc:name="DOM to XML"/> 
    <splitter expression="#[xpath3('//cfg:Configuration', message.payload, 'NODESET')]" doc:name="Splitter" enableCorrelation="ALWAYS"/> 
    <mulexml:dom-to-xml-transformer doc:name="DOM to XML"/> 
    <set-variable variableName="firstElement" value="#[xpath3('name(/*/*[1])', message.payload, 'STRING')]" doc:name="GetConfigItem" /> 
    <choice doc:name="Route by Config Item Type"> 
     <when expression="#[flowVars['firstElement'] == 'dir:DirectoryObject']"> 
      <amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="DirectoryObject Queue" connector-ref="connector.amqp.mule.default"/> 
      <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/> 
      <logger message="&quot;Back from DirectoryQueue with &quot; + #[payload]" level="INFO" doc:name="Logger"/> 
     </when> 
     <when expression="#[flowVars['firstElement'] == 'gpo:GroupPolicyObject']"> 
      <amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="GroupPolicy Queue" connector-ref="connector.amqp.mule.default"/> 
     </when> 
    </choice> 
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/> 
    <collection-aggregator timeout="60000" failOnTimeout="true" doc:name="Collection Aggregator"/> 

目錄隊列流

<flow name="directoryobjectFlow"> 
    <amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/> 
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/> 
    <http:request config-ref="HTTP_Request_Configuration" path="/xml" method="POST" responseTimeout="60000" doc:name="HTTP"> 
     <http:request-builder> 
      <http:header headerName="Content-Type" value="application/xml"/> 
      <http:header headerName="Accept" value="application/xml"/> 
     </http:request-builder> 
    </http:request> 
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/> 
    <logger level="INFO" doc:name="Logger" message="Exit DirectoryFlow with #[payload]"/> 
    </flow> 

和集團政策隊列(目前設置爲顯示它什麼都不做)

<flow name="amiab-esb-grouppolicyFlow"> 
    <amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/> 
    <set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/> 
    <set-payload doc:name="Set Payload" value="Nothing" /> 
    <logger level="INFO" doc:name="Logger" message="Exit GroupPolicy with #[payload]"/> 
    </flow> 

我對騾很陌生,只是要抓住它,所以我會非常感謝任何想法或見解。

謝謝!

+0

你好Nich,我是amqp connecto的維護者。我應該能夠提供幫助,但是我不完全理解這個問題。你能改說嗎? –

+0

HI Victor,基本上我正在嘗試使用AMQP(RabbitMQ)隊列來執行流之間的持久請求/響應,但似乎無法使其同步工作,並等待來自HTTP請求的答案並將其作爲答覆發回。我使用的是AMQP 3.4.4.201409101602,這是通過雲連接器站點提供的。 –

+0

還有更多更新的版本,但似乎有問題,更新不會發布到社區用戶的交流。我們明天可能會發佈一個新版本。我會回來的回購地址。 –

回答

1

請使用今天發佈的最新版本的連接器3.6.2。這將毫無問題地執行請求響應端點的流程。

+0

謝謝維克多,我會給它一個去。 :) –

+0

是的,這很漂亮!現在所有的東西都應該如此。謝謝! –