2012-04-23 64 views
1

我已經完成了一些駱駝小項目,但有一件事我很難理解如何處理大數據(這不適合內存)在駱駝路線消費時。Apache Camel:來自數據庫的數據的GBs路由到JMS端點

我有一個數據庫,其中包含一些我希望使用camel進行路由的GB值的數據。顯然,將所有數據讀入內存不是一種選擇。

如果我這樣做是一個獨立的應用程序,我會有通過數據分頁並將塊發送到我的JMS入口點的代碼。我想使用駱駝,因爲它提供了一個很好的模式。如果我正在使用文件,我可以使用streaming()調用。

也應該使用camel-sql/camel-jdbc/camel-jpa或使用一個bean從我的數據庫中讀取數據。

希望大家仍然和我在一起。我更熟悉Java DSL,但會很感激人們可以提供的任何幫助/建議。

更新:2-MAY-2012

所以我有一些時間玩這個周圍,我想我實際上做濫用生產者的概念,使我可以使用它在一條路線上。

public class MyCustomRouteBuilder extends RouteBuilder { 

    public void configure(){ 
     from("timer:foo?period=60s").to("mycustomcomponent:TEST"); 

     from("direct:msg").process(new Processor() { 
       public void process(Exchange ex) throws Exception{ 
        System.out.println("Receiving value" : + ex.getIn().getBody()); 
       } 
     } 
    } 

} 

我的製作人看起來像下面這樣。爲了清楚起見,我沒有包含CustomEndpoint或CustomComponent,因爲它看起來很薄。

public class MyCustomProducer extends DefaultProducer{ 

    Endpoint e; 
    CamelContext c; 

    public MyCustomProducer(Endpoint epoint){ 
      super(endpoint) 
      this.e = epoint; 
      this.c = e.getCamelContext(); 
    } 

    public void process(Exchange ex) throws Exceptions{ 

     Endpoint directEndpoint = c.getEndpoint("direct:msg"); 
     ProducerTemplate t = new DefaultProducerTemplate(c); 

     // Simulate streaming operation/chunking of BIG data. 
     for (int i=0; i <20 ; i++){ 
      t.start(); 
      String s ="Value " + i ;     
      t.sendBody(directEndpoint, value) 
      t.stop();   
     } 
    } 
} 

首先上面看起來不太乾淨。看起來最簡潔的方法是通過計劃的石英工作來填充一個jms隊列(代替direct:msg),然後我的駱駝路由消耗,這樣我就可以在我的駱駝內收到的消息大小有更大的靈活性管道。不過,我非常喜歡設置基於時間的激活作爲路線一部分的語義。

有沒有人有任何想法做到這一點的最佳方式。

回答

0

在我的理解,所有你需要做的是:

from("jpa:SomeEntity" + 
    "?consumer.query=select e from SomeEntity e where e.processed = false" + 
    "&maximumResults=150" + 
    "&consumeDelete=false") 
.to("jms:queue:entities"); 

maximumResults定義你每次查詢多少實體得到限制。

當你完成一個實體實例的處理時,你需要設置它e.processed = true;persist()它,以便實體不會再被處理。要做到這一點

一種方式是與@Consumed註釋:

class SomeEntity { 
    @Consumed 
    public void markAsProcessed() { 
     setProcessed(true); 
    } 
} 

另一件事,你需要小心的是你如何把它發送到隊列之前序列化的實體。您可能需要在from和to之間使用濃縮器

+0

在行動書中看到駱駝後,我想我可以在此之前給予或使用自定義組件來獲得更多的控制權。轉換應該是一個問題,因爲我將使用自定義轉換器/轉換。 – K2J 2012-04-24 20:14:54

+0

我發現有用的是序列化實體的主鍵,並讓處理邏輯檢索實際的實體。因此,您不必執行任何複雜的自定義轉換。最後,一個實體通過其密鑰進行區分。 – 2012-04-25 08:56:45

+0

你也可以嘗試使用Apache Nifi,它似乎更適合這個用例。 – Pushkin 2017-11-28 01:45:56