2015-05-11 108 views
4

試圖實現Spring批處理,但面臨一個奇怪的問題,我們的ItemReader類只執行一次。Spring批量項目閱讀器只執行一次

下面是詳細信息。

  1. 如果我們在DB中有1000行。
  2. 我們的項目從讀寫器讀取DB 1000行,並通過列表ItemWriter
  3. ItemWriter成功地刪除所有項目。
  4. 現在ItemReader再次嘗試從數據庫中獲取數據,但沒有找到,因此返回NULL,因此執行停止。
  5. 但是我們已經配置了批處理,並且每分鐘都會使用Quartz調度程序執行。
  6. 現在,如果我們通過dump import在數據庫中插入1000行,批處理作業應該在下次執行時選擇該數據,但它甚至不會執行,儘管 JobLauncher正在執行。

配置: -

1.我們有ItemReader,ItemWriter與提交間隔等於1

<batch:job id="csrfTokenBatchJob"> 
    <batch:step id="step1"> 
     <tasklet> 
     <chunk reader="csrfTokenReader" writer="csrfTokenWriter" commit-interval="1"></chunk> 
     </tasklet> 
    </batch:step> 
    </batch:job> 

2.Job定在每分鐘被觸發。

<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean"> 
    <property name="triggers"> 
     <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean"> 
     <property name="jobDetail" ref="jobDetail" /> 
     <property name="cronExpression" value="0 0/1 * * * ?" /> 
     </bean> 
    </property> 
    </bean> 

3.Job配置

<bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean"> 
    <property name="jobClass" value="com.tavant.oauth.batch.job.CSRFTokenJobLauncher" /> 
    <property name="jobDataAsMap"> 
     <map> 
      <entry key="jobName" value="csrfTokenCleanUpBatchJob" /> 
      <entry key="jobLocator" value-ref="jobRegistry" /> 
      <entry key="jobLauncher" value-ref="jobLauncher" /> 
     </map> 
    </property> 
</bean> 

首次成功執行,但後來它不執行,但是我可以在JobLauncher正在執行日誌中看到。

@Component("csrfTokenReader") 
@Scope(value="step") 
public class CSRFTokenReader implements ItemReader<List<CSRFToken>> { 

    private static final Logger logger = LoggerFactory.getLogger(CSRFTokenReader.class); 

    @Autowired 
    private CleanService cleanService; 

    @Override 
    public List<CSRFToken> read() { 
     List<CSRFToken> csrfTokenList = null; 
     try{ 

      int keepUpto = Integer.valueOf(PropertiesContext.getInstance().getProperties().getProperty("token.keep", "1")); 

      Calendar calTime = Calendar.getInstance(); 
      calTime.add(Calendar.HOUR, -keepUpto); 
      Date toKeep = calTime.getTime(); 

      csrfTokenList = cleanService.getCSRFTokenByTime(toKeep); 
     } 
     catch(Throwable th){ 
      logger.error("Exception in running job At " + new Date() + th); 
     } 
     if(CollectionUtils.isEmpty(csrfTokenList)){ 
      return null; 
     } 
     return csrfTokenList; 
    } 
} 

編輯: -

public class CSRFTokenJobLauncher extends QuartzJobBean { 
    static final String JOB_NAME = "jobName"; 
    private JobLocator jobLocator; 
    private JobLauncher jobLauncher; 
    public void setJobLocator(JobLocator jobLocator) { 
     this.jobLocator = jobLocator; 
    } 
    public void setJobLauncher(JobLauncher jobLauncher) { 
     this.jobLauncher = jobLauncher; 
    } 
    @Override 
    protected void executeInternal(JobExecutionContext context) { 
     Map<String, Object> jobDataMap = context.getMergedJobDataMap(); 
     String jobName = (String) jobDataMap.get(JOB_NAME); 
     log.info("Quartz trigger firing with Spring Batch jobName="+jobName); 
     JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap); 
     try { 
      jobLauncher.run(jobLocator.getJob(jobName), jobParameters); 
     } 
     catch (JobExecutionException e) { 
      log.error("Could not execute job.", e); 
     } 
    } 
    private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) { 
     JobParametersBuilder builder = new JobParametersBuilder(); 
     for (Entry<String, Object> entry : jobDataMap.entrySet()) { 
      String key = entry.getKey(); 
      Object value = entry.getValue(); 
      if (value instanceof String && !key.equals(JOB_NAME)) { 
       builder.addString(key, (String) value); 
      } 
      else if (value instanceof Float || value instanceof Double) { 
       builder.addDouble(key, ((Number) value).doubleValue()); 
      } 
      else if (value instanceof Integer || value instanceof Long) { 
       builder.addLong(key, ((Number)value).longValue()); 
      } 
      else if (value instanceof Date) { 
       builder.addDate(key, (Date) value); 
      } 
     } 
     return builder.toJobParameters(); 
    } 
} 
+0

第二次執行期間你確定'csrfTokenList'不是空的嗎? –

+0

不,它甚至不會執行第二次,儘管JobLauncher每隔1分鐘執行一次 – RE350

+0

第一次執行是否正常終止? –

回答

4

後浪費時間的時間,這個問題現在似乎已得到解決,我已經在tasklet.Now批處理項讀取器,配置allow-start-if-complete="true"正在執行按時間表。

<batch:job id="csrfTokenBatchJob"> 
    <batch:step id="step1"> 
     <batch:tasklet allow-start-if-complete="true"> 
     <batch:chunk reader="csrfTokenReader" writer="csrfTokenWriter" commit-interval="1"></batch:chunk> 
     </batch:tasklet> 
    </batch:step> 
    </batch:job> 
1

Spring批處理記錄數據庫中的每個作業執行。這就是爲什麼春季批次需要區分每項工作的原因。它會檢查作業是否已在當天執行過,並且除非任何作業參數與之前的運行不同,否則將不會再次啓動,如果啓用了完整設置,則允許啓動。

OPTION1: - 如上答覆中提到,我們可以使用允許啓動,如果完成=「真」

OPTION2: - 始終傳遞工作參數,它是當前日期時間戳記。這種工作參數值總是唯一的。

JobExecution jobExecution = jobLauncher.run(reportJob, new JobParametersBuilder() 
        .addDate("now", new Date()) 

OPTION3: - 使用例如RunIdIncrementer的遞增器,所以我們並不需要確保每次都通過唯一的作業參數。

@Bean 
    public Job job1(JobBuilderFactory jobs, Step s1) { 
     return jobs.get("job1") 
       .incrementer(new RunIdIncrementer()) 
       .flow(s1) 
       .end() 
       .build(); 
    }