我有多個客戶端將文件發送到服務器。對於一組數據,有兩個文件包含有關該數據的信息,每個數據都具有相同的名稱。當收到一個文件時,服務器會向我的隊列發送一條消息,其中包含文件路徑,文件名,客戶端ID以及文件的「類型」(全部文件擴展名相同,但有兩種類型, 「給他們打電話A和B)。使用駱駝來聚合相同郵件頭的郵件
一組數據的兩個文件具有相同的文件名。只要服務器收到了兩個文件,我就需要啓動一個合併這兩個文件的程序。目前,我有一些看起來像這樣:
from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args=");
我在哪裏卡住是標題(「CamelFileName」),更具體如何匯聚的作品。
當completionSize設置爲2時,它只是吸收所有消息並將它們存儲在某個數據結構中,直到與第一條消息匹配的第二條消息通過?另外,header()是否需要一個特定的值?我有多個客戶端,所以我正考慮在頭文件中包含客戶端ID和文件名,但是我又不知道是否必須給出具體的值。我也不知道我是否可以使用正則表達式。
任何想法或提示將是超級有用。 謝謝
編輯: 這裏是我現在有一些代碼。根據我對這個問題的描述以及對所選答案的評論看起來是否準確(除了我沒有複製過的近括號)?
public static void main(String args[]) throws Exception{
CamelContext c = new DefaultCamelContext();
c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
//ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
//c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
c.addRoutes(new RouteBuilder() {
public void configure() {
from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success");
}
});
c.start();
while (true) {
System.out.println("Waiting on messages to come through for camel");
Thread.sleep(2 * 1000);
}
//c.stop();
}
private static class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null)
return newExchange;
// and here is where combo stuff goes
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
boolean oldSet = oldBody.contains("set");
boolean newSet = newBody.contains("set");
boolean oldFlow = oldBody.contains("flow");
boolean newFlow = newBody.contains("flow");
if ((oldSet && newFlow) || (oldFlow && newSet)) {
//they match so return new exchange with info so extractor can be started with exec
String combined = oldBody + "\n" + newBody + "\n";
newExchange.getIn().setBody(combined);
return newExchange;
}
else {
// no match so do something....
return null;
}
}
}
所以AggregationStrategy用於以某種方式組合消息,然後將其提供給'exec'? 「兩個交易所」是什麼意思?我剛剛開始使用Camel,所以對我來說一切都還是相當新的np,歡迎來到駱駝......而且Aggregator是相當複雜/強大的工具......簡而言之,Exchange包裝一條消息(來自你的隊列等) ,所以如果你在等待2條消息(由fileName關聯),你最終會得到2個交換組合在一起...然後你需要將相關數據從這些交換到exec(fileName,clientIDs, etc)... – thaweatherman
雖然文件名可以從不同的客戶端得到,但是我可以通過ID和文件名關聯起來嗎? – thaweatherman
對不起,我錯誤地使用了你的用例...正確的,如果你想爲每個客戶端有不同的組合,你應該把聚合表達式作爲fileName + clientID的組合。 –