查看下面TestAggregator
的子類化AbstractAggregator的示例。
import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.transformer.TransformerException;
import org.mule.routing.AbstractAggregator;
import org.mule.routing.AggregationException;
import org.mule.routing.EventGroup;
import org.mule.routing.correlation.CollectionCorrelatorCallback;
import org.mule.routing.correlation.EventCorrelatorCallback;
import org.mule.util.concurrent.ThreadNameHelper;
import java.util.Iterator;
public class TestAggregator extends AbstractAggregator
{
@Override
protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext)
{
return new CollectionCorrelatorCallback(muleContext,false,storePrefix)
{
@Override
public MuleEvent aggregateEvents(EventGroup events) throws AggregationException
{
StringBuffer buffer = new StringBuffer(128);
try
{
for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();)
{
MuleEvent event = iterator.next();
try
{
buffer.append(event.transformMessageToString());
}
catch (TransformerException e)
{
throw new AggregationException(events, null, e);
}
}
}
catch (ObjectStoreException e)
{
throw new AggregationException(events,null,e);
}
logger.debug("event payload is: " + buffer.toString());
return new DefaultMuleEvent(new DefaultMuleMessage(buffer.toString(), muleContext), events.getMessageCollectionEvent());
}
};
}
}
謝謝!我會試試這個。我應該重寫CollectionCorrelatorCallback.shouldAggregateEvents()來提供自己的邏輯,還是建議創建自己的實現EventCorrelatorCallback的類? – awynne
你寧願實現你自己的EventCorrelatorCallback,因爲你根本不依賴消息計數(我理解爲:你不會依賴event.getMessage()。getCorrelationGroupSize()來確定要聚合的事件的數量)。 –
我發現EventCorrelatorCallback是由EventCorrelator調用的,它假定按照關聯ID分組事件。所以我認爲我需要通過「人爲地」將它設置在傳入消息的某處來使用correlationId。其他選擇是維護我自己的數據結構來保存事件。有沒有更好的辦法? – awynne