0
我想對Csv文件做簡單的轉換。但是我的程序被卡住了,沒有給出任何輸出,並且在控制檯上它的打印如下所示。無法處理駱駝大文件
22:38:02.001 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.15.2 (CamelContext: camel-1) is shutting down
22:38:02.135 [main] INFO o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 1 routes (timeout 300 seconds)
22:38:02.167 [main] DEBUG o.a.c.i.DefaultExecutorServiceManager - Created new ThreadPool for source: [email protected] with name: ShutdownTask. -> [email protected]c0a65f[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0][ShutdownTask]
22:38:02.173 [Camel (camel-1) thread #1 - ShutdownTask] DEBUG o.a.c.impl.DefaultShutdownStrategy - There are 1 routes to shutdown
22:38:02.177 [Camel (camel-1) thread #1 - ShutdownTask] DEBUG o.a.c.impl.DefaultShutdownStrategy - Route: route1 suspended and shutdown deferred, was consuming from: Endpoint[file:///home/cloudera/Desktop/camelinput/?delay=15m&noop=true]
22:38:02.177 [Camel (camel-1) thread #1 - ShutdownTask] INFO o.a.c.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 300 seconds.
22:38:02.179 [Camel (camel-1) thread #1 - ShutdownTask] DEBUG o.a.c.impl.DefaultShutdownStrategy - There are 1 inflight exchanges:
InflightExchange: [exchangeId=ID-quickstart-cloudera-40574-1441345060577-0-2, fromRouteId=route1, routeId=route1, nodeId=unmarshal1, elapsed=10787, duration=10791]
22:38:05.436 [Camel (camel-1) thread #1 - ShutdownTask] INFO o.a.c.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 299 seconds.
22:38:05.437 [Camel (camel-1) thread #1 - ShutdownTask] DEBUG o.a.c.impl.DefaultShutdownStrategy - There are 1 inflight exchanges:
InflightExchange: [exchangeId=ID-quickstart-cloudera-40574-1441345060577-0-2, fromRouteId=route1, routeId=route1, nodeId=unmarshal1, elapsed=14045, duration=14049]
22:38:08.201 [Camel (camel-1) thread #1 - ShutdownTask] INFO o.a.c.impl.DefaultShutdownStrategy - Waiting as there are still 2 inflight and pending exchanges to complete, timeout in 298 seconds.
22:38:08.202 [Camel (camel-1) thread #1 - ShutdownTask] DEBUG o.a.c.impl.DefaultShutdownStrategy - There are 1 inflight exchanges:
InflightExchange: [exchangeId=ID-quickstart-cloudera-40574-1441345060577-0-2, fromRouteId=route1, routeId=route1, nodeId=unmarshal1, elapsed=16810, duration=16814]
實際上是相同的程序工作對於小文件,但是當我嘗試用大文件做我得到這個issue.I認爲這可能與線程的問題,請您及時幫我個忙找出問題。 以下是我的計劃
主類
CamelContext myContext = new DefaultCamelContext();
TestRouter myRoute=new TestRouter();
HDFSTransfer hdfsTransfer=new HDFSTransfer();
String copy=hdfsTransfer.copyToLocal("hdfs://localhost:8020", "/user/cloudera/input/CamelTestIn.csv", "/home/cloudera/Desktop/camelinput/");
boolean flag=false;
if("SUCCESS".equals(copy)){
myContext.addRoutes(myRoute);
// Launching the context
myContext.start();
// Pausing to let the route do its work
Thread.sleep(10000);
myContext.stop();
flag=true;
}
if(flag){
hdfsTransfer.moveFile("hdfs://localhost:8020", "file:/home/cloudera/Desktop/camelout/out.csv", "/user/cloudera/output/");
}
}
RouterBuilder類 CsvDataFormat CSV =新CsvDataFormat();
from("file:/home/cloudera/Desktop/camelinput/?noop=true&delay=15m")
.unmarshal(csv)
.convertBodyTo(List.class)
.process(new Processor() {
@Override
public void process(Exchange msg) throws Exception {
List<List<String>> data = (List<List<String>>) msg.getIn().getBody();
for (List<String> line : data) {
// Checks if column two contains text STANDARD
// and alters its value to DELUXE.
// System.out.println("line "+line);
/*if("Aug-04".equalsIgnoreCase(line.get(6))){
line.set(6, "04-August");
}*/
}
}
}).marshal(csv)
.to("file:/home/cloudera/Desktop/camelout/?fileName=out.csv")
.log("done.").end();
}
在此先感謝
嗨@Claus易卜生,謝謝你的答覆。正如你所說我增加了睡眠時間,正如你所說,它導致內存不足的問題。然後什麼是簡單和精確的解決方案。請幫助我。 – prasad