2017-08-24 14 views
0

我需要讀取一個包含csv文件(一個或多個文件)的目錄。我使用SpringBoot的駝峯,我需要將任何文件完全處理(沒有錯誤)移動到OUT目錄,但如果最後一個「到」路線失敗(拋出異常),我需要將文件移動到REFUSED目錄。駱駝讀取目錄,並通過完成任務結果選擇一個目的地

當我嘗試我的代碼,駱駝去不定式循環,繼續處理永遠同一個文件..

24/08/2017 16:27:57.070 ERROR [Camel (camel-1) thread #0 - file://src/main/resources/data] - org.apache.camel.processor.DefaultErrorHandler: Failed delivery for (MessageId: ID-CAD1652-39380-1503584865077-0-33 on ExchangeId: ID-CAD1652-39380-1503584865077-0-34). Exhausted after delivery attempt: 1 caught: com.cadit.exceptions.FileNotEvaluableException: Error: file tipo sconosciuto 

Message History 
--------------------------------------------------------------------------------------------------------------------------------------- 
RouteId    ProcessorId   Processor                  Elapsed (ms) 
[route2   ] [route2   ] [file://src/main/resources/data?idempotent=false&move=OUT%2FVB%2F    ] [  10] 
[route2   ] [unmarshal1  ] [unmarshal[[email protected]]   ] [   1] 
[route2   ] [to1    ] [bean:myCsvHandler?method=doHandleCsvDataVB         ] [   8] 

Stacktrace 
--------------------------------------------------------------------------------------------------------------------------------------- 
com.cadit.exceptions.FileNotEvaluableException: Error: file tipo sconosciuto 
    at com.cadit.handlers.MyCsvHandler.doHandleCsvDataVB(MyCsvHandler.java:172) 
    at com.cadit.handlers.MyCsvHandler$$FastClassBySpringCGLIB$$f4b8f70b.invoke(<generated>) 
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) 
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:721) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 
    at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99) 
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:282) 
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:656) 
    at com.cadit.handlers.MyCsvHandler$$EnhancerBySpringCGLIB$$d81d9e7f.doHandleCsvDataVB(<generated>) 
    at sun.reflect.GeneratedMethodAccessor97.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:458) 
    at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:289) 
    at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:262) 
    at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:178) 
    at org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41) 
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145) 
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77) 
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542) 
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) 
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:120) 
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:83) 
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197) 
    at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:460) 
    at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:227) 
    at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:191) 
    at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:175) 
    at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:102) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

這是主代碼:

@Component 
public class CamelContextConf extends RouteBuilder{ 

    static final Logger logger = Logger.getLogger(CamelContextConf.class); 

    @Override 
    public void configure() throws Exception { 
     restConfiguration().component("servlet").dataFormatProperty("prettyPrint", "true") ; 
        CsvDataFormat csv = new CsvDataFormat(); 
     csv.setDelimiter(";"); 
     csv.setSkipHeaderRecord(true); 


     from("direct:csvprocessor") 
     .streamCaching() 
     .from("file:src/main/resources/data?move=OUT/VB/") 
     .unmarshal(csv) 
     .to("bean:myCsvHandler?method=doHandleCsvDataVB") 
     .onCompletion().onFailureOnly().to("file:src/main/reources/data/REFUSED").end() 
     .setBody(constant("OK")) 
     .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(200)) 
     .setHeader(Exchange.CONTENT_TYPE, constant("text/html")); 

     logger.info("** Route config ok"); 

    } 

然後在myCsvHandler - >方法doHandleCsvDataVB我明確拋出一個異常來測試 失敗:

@Component 
public class MyCsvHandler { 


    @Inject 
    AFVINCCrudRepository _entityManagerVINC; 

    @Inject 
    AFFileCrudRepository _entityManagerAfFile; 

     @Transactional(propagation = Propagation.REQUIRED , transactionManager="DbTransactionManagerVB") 
    public void doHandleCsvDataVB(List<List<String>> csvData) throws FileNotEvaluableException 
    { 
     //System.out.println("stampo.."); 
      if (csvData.size() > 0){ 
      AfFileEntity afbean = new AfFileEntity(); 
      afbean.setNomeFile("test"); 
      afbean.setDataImport(new java.sql.Timestamp(System.currentTimeMillis())); 
      afbean.setTipoFile("M"); 
      afbean.setAfStatoFlusso("I"); 

      _entityManagerAfFile.save(afbean); 

      long pkfile = afbean.getId(); 
      System.out.println("pkfile: " + pkfile); 

      int i = 1; VincEntity vincBean = new VincEntity(); 
      System.out.println(csvData.size()); 
      for (List<String> rows : csvData){ 

       .. 
        _entityManagerVINC.save(..); 
       } 

        throw new FileNotEvaluableException("Il file non è nè una ..."); 

      } 

     } 

    } 

} 

保存方法循環並繼續將數據保存在數據庫中。 出現了什麼問題?

非常感謝。

+0

.from(「file:src/main/resources/data?move = OUT/VB /&moveFailed = REFUSED」)?? – robyp7

回答

0

在你的代碼中,你不能從另一個裏面。

from("direct:csvprocessor") 
.streamCaching() 
.from("file:src/main/resources/data?move=OUT/VB/") 

你的第二個from("file:...")應改爲使用濃縮器。 http://camel.apache.org/content-enricher.html

不確定,但在您的doHandleCsvDataVB()方法中,我看不到try() catch()塊。您將直接從for循環引發異常。

+0

豐富只爲生產者,pollEnricher一次輪詢一個文件? ..但我必須輪詢該導演中的所有文件,而不是隻有一個。 – robyp7

+0

您能否在高層解釋流程的目的是什麼?你想要什麼功能?接收REST請求,然後繼續從一個ftp文件夾中查詢所有文件? –

相關問題