2011-10-01 46 views
1

在mule 3.x中創建完全自定義聚合器的建議方法是什麼?通過完全自定義的,我的意思是按照我自己的邏輯,不使用相關標識,消息計數等如何在Mule中創建自定義聚合器?

在mulesoft網站上的文件已經過時,說要使用AbstractEventAggregator不中3.X存在:

http://www.mulesoft.org/documentation/display/MULE3USER/Message+Splitting+and+Aggregatio

進一步挖掘,它看起來像這類已3.x中被重命名爲AbstractAggregator:

http://www.mulesoft.org/docs/site/3.2.0/apidocs/org/mule/routing/AbstractAggregator.html

但是,有沒有顯示如何使用這個的例子。上面第一個鏈接中描述的LoanBroker示例實際上使用了一個關聯聚合器(在2.x示例中,我認爲這是文檔所指的內容)。

有一次,有一個抽象類有抽象方法shouldAggregate和doAggregate。這是我想要擴展的類。

回答

4

查看下面TestAggregator的子類化AbstractAggregator的示例。

import org.mule.DefaultMuleEvent; 
import org.mule.DefaultMuleMessage; 
import org.mule.api.MuleContext; 
import org.mule.api.MuleEvent; 
import org.mule.api.store.ObjectStoreException; 
import org.mule.api.transformer.TransformerException; 
import org.mule.routing.AbstractAggregator; 
import org.mule.routing.AggregationException; 
import org.mule.routing.EventGroup; 
import org.mule.routing.correlation.CollectionCorrelatorCallback; 
import org.mule.routing.correlation.EventCorrelatorCallback; 
import org.mule.util.concurrent.ThreadNameHelper; 

import java.util.Iterator; 

public class TestAggregator extends AbstractAggregator 
{ 
    @Override 
    protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext) 
    { 
     return new CollectionCorrelatorCallback(muleContext,false,storePrefix) 
     { 
      @Override 
      public MuleEvent aggregateEvents(EventGroup events) throws AggregationException 
      { 
       StringBuffer buffer = new StringBuffer(128); 

       try 
       { 
        for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();) 
        { 
         MuleEvent event = iterator.next(); 
         try 
         { 
          buffer.append(event.transformMessageToString()); 
         } 
         catch (TransformerException e) 
         { 
          throw new AggregationException(events, null, e); 
         } 
        } 
       } 
       catch (ObjectStoreException e) 
       { 
        throw new AggregationException(events,null,e); 
       } 

       logger.debug("event payload is: " + buffer.toString()); 
       return new DefaultMuleEvent(new DefaultMuleMessage(buffer.toString(), muleContext), events.getMessageCollectionEvent()); 
      } 
     }; 
    } 
} 
+0

謝謝!我會試試這個。我應該重寫CollectionCorrelatorCallback.shouldAggregateEvents()來提供自己的邏輯,還是建議創建自己的實現EventCorrelatorCallback的類? – awynne

+0

你寧願實現你自己的EventCorrelatorCallback,因爲你根本不依賴消息計數(我理解爲:你不會依賴event.getMessage()。getCorrelationGroupSize()來確定要聚合的事件的數量)。 –

+0

我發現EventCorrelatorCallback是由EventCorrelator調用的,它假定按照關聯ID分組事件。所以我認爲我需要通過「人爲地」將它設置在傳入消息的某處來使用correlationId。其他選擇是維護我自己的數據結構來保存事件。有沒有更好的辦法? – awynne