2013-01-13 39 views
2

我需要創建恢復模式。 在我的模式中,我只能在給定的時間窗口上啓動作業。 如果作業失敗,它只會在下一個時間窗口重新啓動,並且完成後我想要啓動爲此窗口提前計劃的計劃作業。 作業之間唯一的不同是時間窗口參數。春季批次:重新啓動作業,然後自動啓動下一個作業

我想到JobExecutionDecider與JobExplorer結合或重寫Joblauncher。但是,一切似乎太侵入。

我沒有找到符合我需求的例子任何想法都將是最受歡迎的。

回答

2

只是爲了回顧一下基於由incomplete-co.de提供的建議實際上是完成的。 我創建了一個類似於下面的恢復流程。恢復流程包裝了我的實際批次,僅負責將正確的作業參數提供給內部作業。它可以是第一次執行時的初始參數,正常執行時的新參數或最後一次執行失敗時的舊參數。

<batch:job id="recoveryWrapper" 
     incrementer="wrapperRunIdIncrementer" 
     restartable="true"> 
    <batch:decision id="recoveryFlowDecision" decider="recoveryFlowDecider"> 
     <batch:next on="FIRST_RUN" to="defineParametersOnFirstRun" /> 
     <batch:next on="RECOVER" to="recover.batchJob " /> 
     <batch:next on="CURRENT" to="current.batchJob " /> 
    </batch:decision> 
    <batch:step id="defineParametersOnFirstRun" next="current.batchJob"> 
     <batch:tasklet ref="defineParametersOnFirstRunTasklet"/> 
    </batch:step> 
    <batch:step id="recover.batchJob " next="current.batchJob"> 
     <batch:job ref="batchJob" job-launcher="jobLauncher" 
          job-parameters-extractor="jobParametersExtractor" /> 
    </batch:step> 
    <batch:step id="current.batchJob" > 
     <batch:job ref="batchJob" job-launcher="jobLauncher" 
          job-parameters-extractor="jobParametersExtractor" /> 
    </batch:step> 
</batch:job> 

解決方案的心臟是RecoveryFlowDeciderJobParametersExtractor同時使用春天批量重啓機制。 RecoveryFlowDecider將查詢JobExplorer和JobRepository以瞭解我們是否在上次運行中發生故障。它將把最後一次執行放在包裝器的執行上下文中,以便稍後在JobParametersExtractor中使用。 請注意使用runIdIncremeter允許重新執行包裝作業。

@Component 
public class RecoveryFlowDecider implements JobExecutionDecider { 
     private static final String FIRST_RUN = "FIRST_RUN"; 
     private static final String CURRENT = "CURRENT"; 
     private static final String RECOVER = "RECOVER"; 

     @Autowired 
     private JobExplorer jobExplorer; 
     @Autowired 
     private JobRepository jobRepository; 

     @Override 
     public FlowExecutionStatus decide(JobExecution jobExecution 
             ,StepExecution stepExecution) { 
      // the wrapper is named as the wrapped job + WRAPPER 
      String wrapperJobName = jobExecution.getJobInstance().getJobName(); 
      String jobName; 
      jobName = wrapperJobName.substring(0,wrapperJobName.indexOf(EtlConstants.WRAPPER)); 
      List<JobInstance> instances = jobExplorer.getJobInstances(jobName, 0, 1); 
       JobInstance internalJobInstance = instances.size() > 0 ? instances.get(0) : null; 

      if (null == internalJobInstance) { 
       return new FlowExecutionStatus(FIRST_RUN); 
      } 
      JobExecution lastExecution = jobRepository.getLastJobExecution(internalJobInstance.getJobName() 
    ,internalJobInstance.getJobParameters()); 
      //place the last execution on the context (wrapper context to use later) 
      jobExecution.getExecutionContext().put(EtlConstants.LAST_EXECUTION, lastExecution); 
       ExitStatus exitStatus = lastExecution.getExitStatus(); 

      if (ExitStatus.FAILED.equals(exitStatus) || ExitStatus.UNKNOWN.equals(exitStatus)) { 
       return new FlowExecutionStatus(RECOVER); 
      }else if(ExitStatus.COMPLETED.equals(exitStatus)){ 
       return new FlowExecutionStatus(CURRENT);  
      } 
      //We should never get here unless we have a defect 
      throw new RuntimeException("Unexpecded batch status: "+exitStatus+" in decider!"); 

     } 

} 

然後JobParametersExtractor將最後執行的結果再次測試,在失敗作業的情況下,它將成爲用於執行失敗的作業觸發春季Bacth重啓機制的原始參數。否則,它將創建一組新的參數並在正常過程中執行。

@Component 
    public class JobExecutionWindowParametersExtractor implements 
      JobParametersExtractor { 
     @Override 
     public JobParameters getJobParameters(Job job, StepExecution stepExecution) { 

      // Read the last execution from the wrapping job 
      // in order to build Next Execution Window 
      JobExecution lastExecution= (JobExecution) stepExecution.getJobExecution().getExecutionContext().get(EtlConstants.LAST_EXECUTION);; 

      if(null!=lastExecution){ 
       if (ExitStatus.FAILED.equals(lastExecution.getExitStatus())) { 
        JobInstance instance = lastExecution.getJobInstance(); 
        JobParameters parameters = instance.getJobParameters();     
           return parameters; 
       } 
      }  
      //We do not have failed execution or have no execution at all we need to create a new execution window 
      return buildJobParamaters(lastExecution,stepExecution); 
     } 
... 
} 
0

是否有可能以相反的方式做到這一點?

在每個時間窗口中,提交該時間窗口預定的作業。

但是,作業的第一步應檢查前一時間窗口中的作業是否成功完成。如果之前失敗了,那麼提交以前的工作,並等待完成,然後再進入自己的邏輯。

2

你有沒有考慮過JobStep?也就是說,一個步驟確定是否有任何其他工作要運行。這個值被設置到StepExecutionContext中。一個JobExecutionDecider然後檢查這個值;如果存在,則指向啓動作業的JobStep。

這裏就可以了DOC http://docs.spring.io/spring-batch/reference/htmlsingle/#external-flows

+0

感謝: JobSteps聽起來是個好主意,我已經寫了jobParametersExtractor提供我與預期的參數(基於故障運行和新的運行)。 現在我想在外部工作中整合基於決策者的兩個工具。 聽起來合乎邏輯嗎? –

+0

是的 - 決定者可以決定是否採取JobStep或者不採用JobStep,真正打開你以編程方式表達流程邏輯。記住你可以通過在構造函數中傳入你喜歡的任何字符串來創建自己的FlowExecutionStatus,它將成爲XML路由可用的值。 –

相關問題