2014-05-05 61 views
0

我有兩個Mule實例訂閱了隊列上的同一主題,但我只希望每個消息都消耗一次。在主/從或故障轉移關係中使用Mule實例

我可以通過將消息彙集到唯一隊列並從那裏處理來實現此目的,但爲了降低操作複雜性,我希望將每個Mule實例上運行的消息使用者流設置爲一個實例。

這將類似於ActiveMQ故障轉移設置(一次只運行一個實例,空閒實例僅在運行實例未能響應時喚醒)或主授權/從屬安排,我將授予其中一個實例指揮其他人。或者像虛擬機傳輸那樣是實例間而不是實例內部。

這需要在沒有任何Mule Enterprise Edition組件(僅依賴Mule Community Edition組件)的情況下使用Mule 3.4版來完成。或3.5。

回答

0

我無法找到一個方便的內置方式來做到這一點。相反,我認爲每個騾子實例將在一個單獨的盒子運行,並且使用server.host值來確定哪個實例做了處理:

<mule xmlns:redis="http://www.mulesoft.org/schema/mule/redis" 
    xmlns="http://www.mulesoft.org/schema/mule/core" 
    version="CE-3.5.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://www.mulesoft.org/schema/mule/redis http://www.mulesoft.org/schema/mule/redis/3.4/mule-redis.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd"> 

    <!-- define redis instance --> 
    <redis:config name="redis-instance" /> 

    <flow name="topicConsumer"> 
     <!-- listen to redis channel (topic) --> 
     <redis:subscribe config-ref="redis-instance"> 
      <redis:channels> 
       <redis:channel>topic.channel</redis:channel> 
      </redis:channels> 
     </redis:subscribe> 

     <!-- save original payload (message from Redis) --> 
     <set-session-variable variableName="redisPayload" value="#[payload]" /> 

     <!-- select processor --> 
     <flow-ref name="topicProcessorSelector"/> 

     <choice> 
      <when expression="#[sessionVars['subscriberProcessor'] == server.host]"> 
       <logger level="INFO" message="processing on #[server.host]"/> 
      </when> 
      <otherwise> 
       <logger level="INFO" message="take no action"/> 
      </otherwise> 
     </choice> 
    </flow> 

    <flow name="topicProcessorSelector" processingStrategy="synchronous"> 
     <!-- get key --> 
     <redis:get config-ref="redis-instance" 
      key="topic_processor"/> 

     <!-- if no key, then add this instance as the processor --> 
     <choice> 
      <when expression="#[payload instanceof org.mule.transport.NullPayload]"> 
       <!-- set key --> 
       <redis:set config-ref="redis-instance" 
        key="topic_processor" 
        expire="10" 
        value="#[server.host]"> 
       </redis:set> 
       <set-session-variable variableName="subscriberProcessor" value="#[server.host]" /> 
      </when> 
      <otherwise> 
       <!-- use existing key --> 
       <byte-array-to-string-transformer/> 
       <set-session-variable variableName="subscriberProcessor" value="#[payload]" /> 
      </otherwise> 
     </choice> 
    </flow> 
</mule>