2015-06-29 101 views
2

我有一個spring啓動/集成/批處理,它將運行並輪詢SFTP上的文件。Spring批處理集成Java DSL和RunIdIncrementer不增加

我希望能夠使用在Job中定義的RunIdIncrementer來重新啓動一個作業(可能是因爲應用程序已重新啓動,或者因爲某些原因我們再次收到相同的文件)具有相同參數(基本上是相同文件)組態。

不幸的是run.id = 1不遞增,我得到一個JobInstanceAlreadyCompleteException

作業配置

@Autowired 
private JobBuilderFactory jobBuilders; 

@Bean 
public Job importOffersJob() { 

    Job job = jobBuilders.get("importOffersJob") 
      .start(importOffersStep) 
      .listener(traceJobExecutionListener()) 
      .incrementer(new RunIdIncrementer()) 
      .build(); 

    return job; 
} 

集成

@Bean 
public IntegrationFlow ftpInboundFlow() { 
    return IntegrationFlows 
      .from(Sftp.inboundAdapter(SftpSessionFactory()) 
       .regexFilter(".*\\.xml.mini$") 
       .deleteRemoteFiles(intCfg.getSftpDeleteRemoteFiles()) 
       .preserveTimestamp(Boolean.TRUE) 
       .autoCreateLocalDirectory(Boolean.TRUE) 
       .remoteDirectory(intCfg.getSftpRemoteDirectory()) 
       .localDirectory(new File(intCfg.getSftpLocalDirectory()) 
      ), 
       e -> e.id("sftpInboundAdapter") 
       .poller(Pollers.fixedRate(intCfg.getSftpPollerInMinutes(), TimeUnit.MINUTES).maxMessagesPerPoll(1))) 
      .transform(fileToJobLaunchRequestTransformer())   
      .handle(jobLaunchingGw()) 
      .handle(logger()) 
      .get(); 
} 

public class FileToJobLaunchRequestTransformer implements GenericTransformer<Message<File>, JobLaunchRequest> { 

    private final static Logger log = LoggerFactory.getLogger(FileToJobLaunchRequestTransformer.class); 

    @Autowired 
    @Qualifier("importOffersJob") 
    private Job job; 

    @Override 
    public JobLaunchRequest transform(Message<File> source) { 
     log.info("FileToJobLaunchRequestTransformer, source.getPayload().getAbsolutePath(): {}", source.getPayload() 
       .getAbsolutePath()); 

     JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); 
     jobParametersBuilder.addString("pathToFile", "file:" + source.getPayload().getAbsolutePath()); 
     //jobParametersBuilder.addString("UUID", UUID.randomUUID().toString()); 

     JobParameters jobParams = job.getJobParametersIncrementer().getNext(jobParametersBuilder.toJobParameters()); 

     return new JobLaunchRequest(job, jobParams); 
     } 
    } 

如果我取消註釋jobParametersBuilder.addString("UUID", UUID.randomUUID().toString());這是工作,但我認爲重點是能夠重用在我的作業配置中定義的增量器是不是?

(當我正在運行的同批爲沒有整合增量正在一個簡單的彈簧引導)

千恩萬謝

回答

2

RunIdIncrementer.getNext()

public JobParameters getNext(JobParameters parameters) { 

    JobParameters params = (parameters == null) ? new JobParameters() : parameters; 

    long id = params.getLong(key, 0L) + 1; 
    return new JobParametersBuilder(params).addLong(key, id).toJobParameters(); 
} 

您正在創建一個新的作業參數每次這樣它總是會返回1,因爲run.id不存在。

如果將jobParametersBuilder到現場變壓器,旁邊job,它會工作,但僅限於這個實例您的應用程序中。它會在下一次啓動您的應用時從1開始。

要重新生存,您需要在某處保存run.id值,或者需要從存儲庫中獲取最後的作業參數。

+0

謝謝Gary,你的回答很有用,但我仍然不明白爲什麼我的@autowired Job不使用在其配置'.incrementer(新的RunIdIncrementer())'中定義的增量器。這是預期的行爲嗎?當我作爲一個簡單的彈簧啓動應用程序啓動時(似乎它從存儲庫中自動獲取最後的作業參數),它還有什麼想法?最後,如果我需要從存儲庫中獲取最後的作業參數,我應該重複使用'JobInstanceDao'還是已經有一些幫助器來實現它?謝謝! – landbit

+0

我不得不看你的工作配置,特別是你如何建立工作參數。 '>不使用增量器'。它__是使用作業中的增量器,它只是每次在'transform()'中傳遞一個vanilla'JobParameters'。由於這個新的JP沒有'run.id',它總是被設置爲1. –

+0

這是工作配置!我做了很少的測試,結論是我沒有讀取持久狀態來檢索作業的最後一個好參數。我借用了[link] https://github.com/spring-projects/spring-batch/blob/master/spring-batch-core/src/main/java/org/springframework/batch/core/中的代碼啓動/支持[link] /CommandLineJobRunner.java'getNextJobParameters'並將其放入我的轉換器'FileToJobLaunchRequestTransformer'中,現在我可以看到我的run.id遞增了。我認爲增值部分是工作配置魔術的一部分。再次感謝加里! – landbit

相關問題