2013-06-21 54 views
1

我有多個客戶端將文件發送到服務器。對於一組數據,有兩個文件包含有關該數據的信息,每個數據都具有相同的名稱。當收到一個文件時,服務器會向我的隊列發送一條消息,其中包含文件路徑,文件名,客戶端ID以及文件的「類型」(全部文件擴展名相同,但有兩種類型, 「給他們打電話A和B)。使用駱駝來聚合相同郵件頭的郵件

一組數據的兩個文件具有相同的文件名。只要服務器收到了兩個文件,我就需要啓動一個合併這兩個文件的程序。目前,我有一些看起來像這樣:

from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args="); 

我在哪裏卡住是標題(「CamelFileName」),更具體如何匯聚的作品。

當completionSize設置爲2時,它只是吸收所有消息並將它們存儲在某個數據結構中,直到與第一條消息匹配的第二條消息通過?另外,header()是否需要一個特定的值?我有多個客戶端,所以我正考慮在頭文件中包含客戶端ID和文件名,但是我又不知道是否必須給出具體的值。我也不知道我是否可以使用正則表達式。

任何想法或提示將是超級有用。 謝謝

編輯: 這裏是我現在有一些代碼。根據我對這個問題的描述以及對所選答案的評論看起來是否準確(除了我沒有複製過的近括號)?

public static void main(String args[]) throws Exception{ 
     CamelContext c = new DefaultCamelContext(); 
     c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false")); 
     //ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 
     //c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); 
     c.addRoutes(new RouteBuilder() { 
      public void configure() { 
       from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success"); 
      } 
     }); 
     c.start(); 
     while (true) { 
      System.out.println("Waiting on messages to come through for camel"); 
      Thread.sleep(2 * 1000); 
     } 
     //c.stop(); 
    } 

    private static class MyAggregationStrategy implements AggregationStrategy { 

     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
      if (oldExchange == null) 
       return newExchange; 
      // and here is where combo stuff goes 
      String oldBody = oldExchange.getIn().getBody(String.class); 
      String newBody = newExchange.getIn().getBody(String.class); 
      boolean oldSet = oldBody.contains("set"); 
      boolean newSet = newBody.contains("set"); 
      boolean oldFlow = oldBody.contains("flow"); 
      boolean newFlow = newBody.contains("flow"); 
      if ((oldSet && newFlow) || (oldFlow && newSet)) { 
       //they match so return new exchange with info so extractor can be started with exec 
       String combined = oldBody + "\n" + newBody + "\n"; 
       newExchange.getIn().setBody(combined); 
       return newExchange; 
      } 
      else { 
       // no match so do something.... 
       return null; 
      } 
     } 
    } 

回答

3

你必須提供一個AggregationStrategy來定義你想怎麼交易所合併...

,如果你只對文件名感興趣,並接收正好2的交流,那麼你可以使用UseLatestAggregationStrategy只通過最新的交易所一次2已'彙總'...

說,它聽起來像你需要保留兩個交易所(一個爲每個clientId),所以你可以傳遞信息到'exec'步驟...如果是這樣,您可以使用內置的聚合策略enab將交易所合併到GroupedExchange持有者中通過groupExchanges選項引導...或者指定一個自定義的AggregationStrategy來組合它們,但是您可以選擇。只需要記住,你的「高管」一步需要處理您決定使用什麼聚合結構...

看到這些單元測試的例子:

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java

+0

所以AggregationStrategy用於以某種方式組合消息,然後將其提供給'exec'? 「兩個交易所」是什麼意思?我剛剛開始使用Camel,所以對我來說一切都還是相當新的np,歡迎來到駱駝......而且Aggregator是相當複雜/強大的工具......簡而言之,Exchange包裝一條消息(來自你的隊列等) ,所以如果你在等待2條消息(由fileName關聯),你最終會得到2個交換組合在一起...然後你需要將相關數據從這些交換到exec(fileName,clientIDs, etc)... – thaweatherman

+0

雖然文件名可以從不同的客戶端得到,但是我可以通過ID和文件名關聯起來嗎? – thaweatherman

+0

對不起,我錯誤地使用了你的用例...正確的,如果你想爲每個客戶端有不同的組合,你應該把聚合表達式作爲fileName + clientID的組合。 –