2017-04-10 121 views
0

我用一個輪詢器cron實現了一個spring集成流,其目標是初始化目錄和mongodb集合。如果初始化器流引發RuntimeException,則框架將消息發送到errorChannel。錯誤流採取負責的消息,但一些錯誤:錯誤消息未送達 - 失敗的消息爲空?

2017-04-07 06:25:00.484 | | | | [taskScheduler-8] | DEBUG | com.objectway.bacco.integration.flow.InitializerIntegrationFlowConfiguration | cleanTmp | INITIALIZATION - Directory bacco created 
2017-04-07 06:25:04.857 | | | | [taskScheduler-9] | ERROR | org.springframework.integration.handler.LoggingHandler | handleMessageInternal | org.springframework.messaging.MessagingException: The path [/tmp/bacco/bank_ftp/00082] does not denote a properly accessible directory. 
    at org.springframework.integration.file.DefaultDirectoryScanner.listFiles(DefaultDirectoryScanner.java:83) 
    at org.springframework.integration.file.FileReadingMessageSource.scanInputDirectory(FileReadingMessageSource.java:293) 
    at org.springframework.integration.file.FileReadingMessageSource.receive(FileReadingMessageSource.java:272) 
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:191) 
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:59) 
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134) 
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:175) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:224) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:57) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:176) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:173) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:330) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55) 
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324) 
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

2017-04-07 06:25:04.882 | | | | [taskScheduler-9] | INFO | com.objectway.bacco.service.logging.logback.LogServiceLogback | info | null | null | null | ERROR - Processing error 
2017-04-07 06:25:04.930 | | | | [taskScheduler-9] | WARN | org.springframework.integration.channel.MessagePublishingErrorHandler | handleError | Error message was not delivered. 
org.springframework.integration.transformer.MessageTransformationException: Failed to transform Message; nested exception is org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1007E:(pos 0): Property or field 'headers' cannot be found on null 
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:95) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:227) 
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:176) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) 
    at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:85) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:58) 
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324) 
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1007E:(pos 0): Property or field 'headers' cannot be found on null 
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:96) 
    at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:90) 
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:89) 
    ... 34 common frames omitted 
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1007E:(pos 0): Property or field 'headers' cannot be found on null 
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:220) 
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:94) 
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:81) 
    at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:51) 
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:87) 
    at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:120) 
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:267) 
    at com.objectway.bacco.integration.component.analytics.ProcessTransformer.evaluateExpression(ProcessTransformer.java:118) 
    at com.objectway.bacco.integration.component.analytics.ProcessTransformer.retrieveValueByKey(ProcessTransformer.java:115) 
    at com.objectway.bacco.integration.component.analytics.ProcessTransformer.transform(ProcessTransformer.java:57) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:113) 
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:129) 
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:49) 
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:347) 
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88) 
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131) 
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330) 
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:166) 
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:317) 
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:155) 
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:93) 

從文檔四至當異常發生在預定的輪詢任務的執行,這些異常將被包裝在ErrorMessages併發送至errorChannel以及。 ErrorMessages是否存在問題?我找到了ErrorMessage類,但沒有找到ErrorMessages。

謝謝

編輯:的failedMessage爲空。這怎麼可能?

@Bean 
public MessageSource<?> dateMessageSource() { 
    MethodInvokingMessageSource source = new MethodInvokingMessageSource(); 
    source.setObject(new SystemTime()); 
    source.setMethodName("getTime"); 
    return source; 
} 

@Bean 
public IntegrationFlow initIntegrationFlow(ProcessTransformer processTransformer, 
              MongoCleanerService mongoCleanerService, 
              ControlComponentService controlComponentService, 
              DirectoryService directoryService, 
              @Value("${output.directory}") String outputDirectory, 
              @Qualifier(DAILY_POLLER) PollerSpec pollerSpec) { 
    return IntegrationFlows.from(dateMessageSource(), c -> c.poller(pollerSpec)) 
      .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.headerFunction(CORRELATION_ID, message -> "00000") 
        .headerFunction(NETWORK, message -> "-ALL-") 
        .headerFunction(GROUP, message -> "ALL") 
        .headerFunction(DOMAIN, (message) -> (INIT))) 
      .transform(new FlowLogger() 
        .level(INFO) 
        .expression("'INITIALIZATION - Start Initializer, drop collections and temporary files'") 
        .logService(logService)) 
      .transform(processTransformer) 
      .handle((payload, headers) -> { 
       controlComponentService.manageComponent("ftpExportFlow", "start"); 
       mongoCleanerService.clean(); 
       directoryService.cleanTmp(outputDirectory); 
       return new InitializationMessage(); 
      }) 
      .channel(INITIALIZED_CHANNEL) 
      .get(); 
} 

@Bean 
public IntegrationFlow endWithErrorFlowBean(ProcessRepository processRepository, 
              AnalyticsService analyticsService, 
              LogService logService) { 
    return IntegrationFlows 
      .from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) 
      .transform(new FlowLogger().level(INFO).expression("'ERROR - Processing error'") 
        .logService(logService)) 
      .transform(new ProcessTransformer() 
        .processRepository(processRepository) 
        .analyticsService(analyticsService) 
        .processStatusType(ProcessStatusType.PROCESS_WITH_ERROR)) 
      .transform(source -> { 
       controlBus.sendCommand("@ftpExportFlow.stop()"); 
       return processRepository.findByProcessStatusType(ProcessStatusType.PROCESS_WITH_ERROR); 
      }) 
      .filter(processes -> ((List<Process>) processes).size() == 1) 
      .channel(REPORT_INPUT_CHANNEL) 
      .get(); 
} 
+1

這是一個錯字;它應該是'ErrorMessage's - 你需要顯示你的配置給任何人來幫助解決這個問題,並顯示完整的堆棧跟蹤。 –

+0

當您提供更多信息時,您應該始終發表評論。我們不會收到編輯通知。 –

+0

好的:)謝謝 – MeewU

回答

1

故障是一個消息已經被創建之前 - 因此nullfailedMessage

2017-04-07 06:25:04.857 | | | | [taskScheduler-9] |錯誤| org.springframework.integration.handler.LoggingHandler | handleMessageInternal | org.springframework.messaging.MessagingException:路徑[/ tmp/bacco/bank_ftp/00082]不表示正確可訪問的目錄。

創建消息後,通常只有在下游流發生錯誤時纔會獲得failedMessage屬性。

+0

謝謝你的回答! 但是,是這條線IntegrationFlows.from(dateMessageSource(),c - > c.poller(pollerSpec))創建一條消息嗎?處理後拋出異常。 – MeewU

+0

不;如果有文件要處理它發送消息,輪詢器會輪詢消息源;如果沒有文件,它什麼也不做。如果消息源引發異常(例如你的壞目錄),那麼'ErrorMessage'中不會包含任何消息。嘗試處理該消息時,您會遇到次要異常。 –

+0

對不起,我沒有給你解釋。 init的目標是清理環境(mongo和tmp目錄)。輪詢器每天早上6點25分開始(星期一至星期五),而dateMessageSouce()創建一條新的日期作爲有效負載。這不是文件輪詢,它只是一個觸發器。調用到句柄中的服務拋出異常。 – MeewU