我有一個spring啓動/集成/批處理,它將運行並輪詢SFTP上的文件。Spring批處理集成Java DSL和RunIdIncrementer不增加
我希望能夠使用在Job中定義的RunIdIncrementer
來重新啓動一個作業(可能是因爲應用程序已重新啓動,或者因爲某些原因我們再次收到相同的文件)具有相同參數(基本上是相同文件)組態。
不幸的是run.id = 1不遞增,我得到一個JobInstanceAlreadyCompleteException
作業配置
@Autowired
private JobBuilderFactory jobBuilders;
@Bean
public Job importOffersJob() {
Job job = jobBuilders.get("importOffersJob")
.start(importOffersStep)
.listener(traceJobExecutionListener())
.incrementer(new RunIdIncrementer())
.build();
return job;
}
集成
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Sftp.inboundAdapter(SftpSessionFactory())
.regexFilter(".*\\.xml.mini$")
.deleteRemoteFiles(intCfg.getSftpDeleteRemoteFiles())
.preserveTimestamp(Boolean.TRUE)
.autoCreateLocalDirectory(Boolean.TRUE)
.remoteDirectory(intCfg.getSftpRemoteDirectory())
.localDirectory(new File(intCfg.getSftpLocalDirectory())
),
e -> e.id("sftpInboundAdapter")
.poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES).maxMessagesPerPoll(1)))
.transform(fileToJobLaunchRequestTransformer())
.handle(jobLaunchingGw())
.handle(logger())
.get();
}
public class FileToJobLaunchRequestTransformer implements GenericTransformer<Message<File>, JobLaunchRequest> {
private final static Logger log = LoggerFactory.getLogger(FileToJobLaunchRequestTransformer.class);
@Autowired
@Qualifier("importOffersJob")
private Job job;
@Override
public JobLaunchRequest transform(Message<File> source) {
log.info("FileToJobLaunchRequestTransformer, source.getPayload().getAbsolutePath(): {}", source.getPayload()
.getAbsolutePath());
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("pathToFile", "file:" + source.getPayload().getAbsolutePath());
//jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());
JobParameters jobParams = job.getJobParametersIncrementer().getNext(jobParametersBuilder.toJobParameters());
return new JobLaunchRequest(job, jobParams);
}
}
如果我取消註釋jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());
這是工作,但我認爲重點是能夠重用在我的作業配置中定義的增量器是不是?
(當我正在運行的同批爲沒有整合增量正在一個簡單的彈簧引導)
千恩萬謝
謝謝Gary,你的回答很有用,但我仍然不明白爲什麼我的@autowired Job不使用在其配置'.incrementer(新的RunIdIncrementer())'中定義的增量器。這是預期的行爲嗎?當我作爲一個簡單的彈簧啓動應用程序啓動時(似乎它從存儲庫中自動獲取最後的作業參數),它還有什麼想法?最後,如果我需要從存儲庫中獲取最後的作業參數,我應該重複使用'JobInstanceDao'還是已經有一些幫助器來實現它?謝謝! – landbit
我不得不看你的工作配置,特別是你如何建立工作參數。 '>不使用增量器'。它__是使用作業中的增量器,它只是每次在'transform()'中傳遞一個vanilla'JobParameters'。由於這個新的JP沒有'run.id',它總是被設置爲1. –
這是工作配置!我做了很少的測試,結論是我沒有讀取持久狀態來檢索作業的最後一個好參數。我借用了[link] https://github.com/spring-projects/spring-batch/blob/master/spring-batch-core/src/main/java/org/springframework/batch/core/中的代碼啓動/支持[link] /CommandLineJobRunner.java'getNextJobParameters'並將其放入我的轉換器'FileToJobLaunchRequestTransformer'中,現在我可以看到我的run.id遞增了。我認爲增值部分是工作配置魔術的一部分。再次感謝加里! – landbit