2016-06-08 51 views
0

我有一個包含大約5.2M行的大型CSV文件。我想解析這個文件並將數據插入到數據庫中。我爲此使用apache駱駝。使用Apache Camel插入大型CSV文件時的GC問題

路線是相當容易的(簡化本示例)

from("file:work/customer/").id("customerDataRoute") 
.split(body().tokenize("\n")).streaming() 
.parallelProcessing() 
.unmarshal(bindyCustomer) 
.split(body()) 
.process(new CustomerProcessor()) 
.to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)"); 

bindyCustomer是CSV文件和 CustomerProcessor一個BindyCsvDataFormat是返回Bindy客戶對象的數據作爲對象的陣列的處理器爲SQL插入。實際的對象有39個字段(以上簡化)。

這對第一個800.000到1.000.000行都可以,但是它會停下來。

我用JVisualVM和Visual GC插件監視了駱駝實例,我可以看到老一代填滿了,當它達到最大值時,整個系統停止運行,但不會崩潰。 在這一點上,老一代已經滿員了,伊甸園的空間幾乎已經滿了,兩個倖存者空間都是空的(因爲它不能將任何東西移動到我猜想的老一代)。

那麼這裏有什麼問題?這看起來像是Camel SQL組件中的內存泄漏。 數據主要存儲在ConcurrentHashMap對象中。

當我拿出SQL組件時,老一代幾乎沒有填充。

我正在使用駱駝2.15.1將嘗試使用2.17.1看看是否解決了這個問題。

更新:我試過駱駝2.17.1(同樣的問題),我試圖用java.sql.Statement.executeUPdate在Java中插入插入。有了這個選項,我設法插入了大約2.6 M行,但隨後它也停止了。有趣的是我沒有收到內存錯誤。它只是停下來。

回答

1

好吧我想通了這裏出了什麼問題。基本上讀取部分與插入部分相比太快。這個例子有點過於簡單,因爲在閱讀和插入之間有一個seda隊列(因爲我必須對內容做出選擇,而這個內容在示例中沒有顯示)。 但即使沒有seda隊列,它從來沒有完成。我意識到當我殺死駱駝時發生了什麼錯誤,並得到一條消息,說明還有幾千條機上消息。

因此,當插入端無法跟上時,並行處理讀取沒有意義。

from("file:work/customer/").id("customerDataRoute") 
     .onCompletion().log("Customer data processing finished").end() 
     .log("Processing customer data ${file:name}") 
     .split(body().tokenize("\n")).streaming() //no more parallel processing 
     .choice() 
      .when(simple("${body} contains 'HEADER TEXT'")) //strip out the header if it exists 
      .log("Skipping first line") 
      .endChoice() 
     .otherwise() 
      .to("seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true") 
      .endChoice(); 


from("seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true") 
      .unmarshal(bindyCustomer) 
      .split(body()) 
      .process(new CustomerProcessor()).id("CustomProcessor") //converts one Notification into an array of values for the SQL insert 
.to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)"); 

我在SEDA隊列上定義了一個大小(默認情況下它不受限制),並在隊列滿時調用線程塊。

seda:processCustomer?size=40&concurrentConsumers=20&blockWhenFull=true 

並行處理是通過在SEDA隊列上使用20個併發消費者完成的。請注意,出於什麼原因,您在調用路由時也必須指定隊列大小(不僅在您定義它的位置)。

現在的內存消耗是最小的,它插入500萬記錄沒有問題。

1

我沒有測試你的代碼,但是,我確實注意到你的第二個拆分語句不是流式。我建議嘗試一下。如果你有太多並行的工作流,GC可能會在你釋放資源之前填滿。 SQL語句需要的時間可能是什麼讓GC得到了太多的建立時間,因爲你正在對主要處理進行並行化。

from("file:work/customer/").id("customerDataRoute") 
    .split(body().tokenize("\n")).streaming().parallelProcessing() 
     .unmarshal(bindyCustomer) 
     .split(body()).streaming() //Add a streaming call here and see what happens 
      .process(new CustomerProcessor()) 
      .to("sql:INSERT INTO CUSTOMER_DATA(`FIELD1`,`FIELD2`) VALUES(#,#)"); 
+0

感謝您的提示。我試過了,但並沒有解決問題。 .unmarchal(bindyCustomer)只返回一個元素的數組,所以在這種情況下流應該不會有太大的區別。你能想到其他可能會出錯的東西嗎?我將嘗試在Java中執行插入以查看是否解決了問題。 – Ben

+0

嗯,我有一些粗略的猜測。你能夠添加id標籤到你的路線,然後打開你的JConsole來確認所有線程是「掛起」的嗎? –

+0

這條路線已經作爲一個ID(customerDataRoute)或者你指的是別的嗎? – Ben