2017-06-28 92 views
1

我想定期掃描目錄以查找與特定模式匹配的文件併爲找到的文件啓動作業。使用JobLaunchingGateway進行彈簧批處理和彈簧集成拋出DestinationResolutionException

@Configuration 
@EnableBatchProcessing 
public class BatchConfiguration { 

    private final JobBuilderFactory jobBuilderFactory; 
    private final StepBuilderFactory stepBuilderFactory; 

    @Autowired 
    public BatchConfiguration(final JobBuilderFactory jobBuilderFactory, final StepBuilderFactory stepBuilderFactory) { 
     this.jobBuilderFactory = jobBuilderFactory; 
     this.stepBuilderFactory = stepBuilderFactory; 
    } 

    @Bean 
    public FileMessageToJobRequest fileMessageToJobRequestTransformer(final Job myJob) { 
     return new FileMessageToJobRequest(myJob, "input.file.name"); 
    } 

    @Bean 
    public MessageSource<File> fileMessageSource() { 
     final FileReadingMessageSource fileReadingMessageSource = new FileReadingMessageSource(); 
     fileReadingMessageSource.setDirectory(directory)); 
     fileReadingMessageSource.setFilter(new CompositeFileListFilter<>(Arrays.asList(
       new AcceptOnceFileListFilter<>(), 
       new SimplePatternFileListFilter(pattern) 
     ))); 
     return fileReadingMessageSource; 
    } 

    @Bean 
    public IntegrationFlow pollingFlow(final FileMessageToJobRequest fileMessageToJobRequest, final JobLaunchingGateway jobLaunchingGateway) { 
     return IntegrationFlows.from(fileMessageSource(), 
       c -> c.poller(Pollers.fixedDelay(10, TimeUnit.SECONDS))) 
       .transform(fileMessageToJobRequest) 
       .handle(jobLaunchingGateway) 
       .get(); 
    } 

    @Bean 
    public JobLaunchingGateway jobLaunchingGateway(final JobLauncher jobLauncher) { 
     return new JobLaunchingGateway(jobLauncher); 
    } 

    // Step definition omitted for readability 

    @Bean(name = "myJob") 
    public Job myJob(final JobCompletionNotificationListener listener, final @Qualifier("step1") Step step) { 
     return jobBuilderFactory.get("myJob") 
       .preventRestart() 
       .incrementer(new RunIdIncrementer()) 
       .listener(listener) 
       .flow(myStep) 
       .end() 
       .build(); 
    } 
} 

一切正常,但我不斷收到以下異常作業執行後:從執行作業後發佈Message s到輸出通道

2017-06-28 16:55:20.510 ERROR 5480 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=JobLaunchRequest: myJob, parameters={input.file.name=/tmp/example.csv}, headers={id=99f4784a-1531-7d64-5abd-4ffe23455c45, timestamp=1498661720296}] 
    at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:210) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55) 
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344) 
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:287) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    ... 41 more 

如何停止JobLaunchingGateway

回答

1

這是因爲JobLaunchingGateway是reqeust/reply組件,它需要一個outputChannelreplyChannel標頭。同時,你只有這在你的配置:

.handle(jobLaunchingGateway) 
.get(); 

如果你不感興趣,從那裏的回覆,你可以只添加這到底:

.handle(jobLaunchingGateway) 
.channel("nullChannel") 
.get(); 
+0

太好了!我試着用ServiceActivator來做這件事,不知道它也可以從流程中完成。謝謝。 – aturkovic

+1

還有一個'JobLaunchingMessageHandler',你可以使用,如果你不想等待工作結果。 –

+0

因此'JobLaunchingGateway'是一個合適的'MessageHandler',它在返回JobExecution之前只是委託給'JobLaunchingMessageHandler'?直接調用處理程序似乎是一個更好的主意,然後感謝提示。 – aturkovic