我試圖以持久的方式處理Mule中的XML對象隊列,這些對象已經從原始xml文件中分離出來,然後使用「選擇」組件進行路由。 選擇組件的每個支路導致帶有不同隊列的AMQP端點。每個隊列的另一端是應該讀取隊列的另一個Mule流,對XML執行一些操作並將其作爲回覆返回。所有的AMQP端點都被設置爲請求響應。Mule中的同步AMQP連接器的問題
該流程似乎正常工作,直到它將某些內容放到AMQP隊列中,但之後只是立即繼續,而不是等待消息。
這也是由遠程流程產生的,它似乎正確地讀取隊列,但似乎立即回覆,然後繼續正確處理它。最後,它應該回復該消息,但似乎沒有這樣做。
這裏有一些代碼段的情況下,任何人都可以指出我要去哪裏錯了...
主路由器流量
<amqp:connector name="connector.amqp.mule.default" doc:name="AMQP Connector" validateConnections="true"/>
<flow name="routerFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/configtest" doc:name="HTTP" allowedMethods="POST" />
<mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
<splitter expression="#[xpath3('//cfg:Configuration', message.payload, 'NODESET')]" doc:name="Splitter" enableCorrelation="ALWAYS"/>
<mulexml:dom-to-xml-transformer doc:name="DOM to XML"/>
<set-variable variableName="firstElement" value="#[xpath3('name(/*/*[1])', message.payload, 'STRING')]" doc:name="GetConfigItem" />
<choice doc:name="Route by Config Item Type">
<when expression="#[flowVars['firstElement'] == 'dir:DirectoryObject']">
<amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="DirectoryObject Queue" connector-ref="connector.amqp.mule.default"/>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<logger message=""Back from DirectoryQueue with " + #[payload]" level="INFO" doc:name="Logger"/>
</when>
<when expression="#[flowVars['firstElement'] == 'gpo:GroupPolicyObject']">
<amqp:outbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="GroupPolicy Queue" connector-ref="connector.amqp.mule.default"/>
</when>
</choice>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<collection-aggregator timeout="60000" failOnTimeout="true" doc:name="Collection Aggregator"/>
目錄隊列流
<flow name="directoryobjectFlow">
<amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationDirectoryObject" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<http:request config-ref="HTTP_Request_Configuration" path="/xml" method="POST" responseTimeout="60000" doc:name="HTTP">
<http:request-builder>
<http:header headerName="Content-Type" value="application/xml"/>
<http:header headerName="Accept" value="application/xml"/>
</http:request-builder>
</http:request>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<logger level="INFO" doc:name="Logger" message="Exit DirectoryFlow with #[payload]"/>
</flow>
和集團政策隊列(目前設置爲顯示它什麼都不做)
<flow name="amiab-esb-grouppolicyFlow">
<amqp:inbound-endpoint exchangeName="configuration-exchange" exchangeType="fanout" exchangeDurable="true" queueName="configurationGroupPolicy" queueDurable="true" routingKey="configuration.public.*" responseTimeout="10000" exchange-pattern="request-response" doc:name="AMQP" connector-ref="connector.amqp.mule.default"/>
<set-payload value="#[message.payloadAs(java.lang.String)]" doc:name="Convert to String"/>
<set-payload doc:name="Set Payload" value="Nothing" />
<logger level="INFO" doc:name="Logger" message="Exit GroupPolicy with #[payload]"/>
</flow>
我對騾很陌生,只是要抓住它,所以我會非常感謝任何想法或見解。
謝謝!
你好Nich,我是amqp connecto的維護者。我應該能夠提供幫助,但是我不完全理解這個問題。你能改說嗎? –
HI Victor,基本上我正在嘗試使用AMQP(RabbitMQ)隊列來執行流之間的持久請求/響應,但似乎無法使其同步工作,並等待來自HTTP請求的答案並將其作爲答覆發回。我使用的是AMQP 3.4.4.201409101602,這是通過雲連接器站點提供的。 –
還有更多更新的版本,但似乎有問題,更新不會發布到社區用戶的交流。我們明天可能會發佈一個新版本。我會回來的回購地址。 –