0

我使用Spring批處理應用程序和調度創建了一個Spring Boot。當我創建只有一個工作,事情工作正常。但是,當我嘗試使用模塊化方法創建另一項工作時,即使我正在使用不同的閱讀器,我也收到了一些錯誤,如閱讀器已關閉,並且有些錯誤與版本相關。這些工作和步驟正在運行很多次,並且它們正在被複制。Spring Boot + Spring批處理多任務創建和調度

任何人都可以請指導我如何解決這些問題,並以相互獨立的並行方式運行作業?

下面是配置類: ModularJobConfiguration.java,DeptBatchConfiguration.java和CityBatchConfiguration.java和BatchScheduler.java

@Configuration 
@EnableBatchProcessing(modular=true) 
public class ModularJobConfiguration { 

    @Bean 
    public ApplicationContextFactory firstJob() { 
     return new GenericApplicationContextFactory(DeptBatchConfiguration.class); 
    } 

    @Bean 
    public ApplicationContextFactory secondJob() { 
     return new GenericApplicationContextFactory(CityBatchConfiguration.class); 
    } 

} 


@Configuration 
@EnableBatchProcessing 
@Import({BatchScheduler.class}) 
public class DeptBatchConfiguration { 

    private static final Logger LOGGER = LoggerFactory.getLogger(DeptBatchConfiguration.class); 

    @Autowired 
    private SimpleJobLauncher jobLauncher; 

    @Autowired 
    public JobBuilderFactory jobBuilderFactory; 

    @Autowired 
    public StepBuilderFactory stepBuilderFactory; 

    @Autowired 
    public JobExecutionListener listener; 

    public ItemReader<DepartmentModelReader> deptReaderSO; 


    @Autowired 
    @Qualifier("dataSourceReader") 
    private DataSource dataSourceReader; 


    @Autowired 
    @Qualifier("dataSourceWriter") 
    private DataSource dataSourceWriter; 



    @Scheduled(cron = "0 0/1 * * * ?") 
    public void performFirstJob() throws Exception { 

     long startTime = System.currentTimeMillis(); 
     LOGGER.info("Job1 Started at :" + new Date()); 
     JobParameters param = new JobParametersBuilder().addString("JobID1",String.valueOf(System.currentTimeMillis())).toJobParameters(); 

     JobExecution execution = (JobExecution) jobLauncher.run(importDeptJob(jobBuilderFactory,stepdept(deptReaderSO,customWriter()),listener), param); 

     long endTime = System.currentTimeMillis(); 
     LOGGER.info("Job1 finished at " + (endTime - startTime)/1000 + " seconds with status :" + execution.getExitStatus()); 
    } 

    @Bean 
    public ItemReader<DepartmentModelReader> deptReaderSO() { 
     //LOGGER.info("Inside deptReaderSO Method"); 
     JdbcCursorItemReader<DepartmentModelReader> deptReaderSO = new JdbcCursorItemReader<>(); 
     //deptReaderSO.setSql("select id, firstName, lastname, random_num from reader"); 
     deptReaderSO.setSql("SELECT DEPT_CODE,DEPT_NAME,FULL_DEPT_NAME,CITY_CODE,CITY_NAME,CITY_TYPE_NAME,CREATED_USER_ID,CREATED_G_DATE,MODIFIED_USER_ID,MODIFIED_G_DATE,RECORD_ACTIVITY,DEPT_CLASS,DEPT_PARENT,DEPT_PARENT_NAME FROM TBL_SAMPLE_SAFTY_DEPTS"); 
     deptReaderSO.setDataSource(dataSourceReader); 
     deptReaderSO.setRowMapper(
       (ResultSet resultSet, int rowNum) -> { 
        if (!(resultSet.isAfterLast()) && !(resultSet.isBeforeFirst())) { 
         DepartmentModelReader recordSO = new DepartmentModelReader(); 
         recordSO.setDeptCode(resultSet.getString("DEPT_CODE")); 
         recordSO.setDeptName(resultSet.getString("DEPT_NAME")); 
         recordSO.setFullDeptName(resultSet.getString("FULL_DEPT_NAME")); 
         recordSO.setCityCode(resultSet.getInt("CITY_CODE")); 
         recordSO.setCityName(resultSet.getString("CITY_NAME")); 
         recordSO.setCityTypeName(resultSet.getString("CITY_TYPE_NAME")); 
         recordSO.setCreatedUserId(resultSet.getInt("CREATED_USER_ID")); 
         recordSO.setCreatedGDate(resultSet.getDate("CREATED_G_DATE")); 
         recordSO.setModifiedUserId(resultSet.getString("MODIFIED_USER_ID")); 
         recordSO.setModifiedGDate(resultSet.getDate("MODIFIED_G_DATE")); 
         recordSO.setRecordActivity(resultSet.getInt("RECORD_ACTIVITY")); 
         recordSO.setDeptClass(resultSet.getInt("DEPT_CLASS")); 
         recordSO.setDeptParent(resultSet.getString("DEPT_PARENT")); 
         recordSO.setDeptParentName(resultSet.getString("DEPT_PARENT_NAME")); 

         // LOGGER.info("RowMapper record : {}", recordSO.getDeptCode() +" | "+recordSO.getDeptName()); 
         return recordSO; 
        } else { 
         LOGGER.info("Returning null from rowMapper"); 
         return null; 
        } 
       }); 
     return deptReaderSO; 
    } 

    @Bean 
    public ItemProcessor<DepartmentModelReader, DepartmentModelWriter> processor() { 
     //LOGGER.info("Inside Processor Method"); 
     return new RecordProcessor(); 
    } 

    @Bean 
    public ItemWriter<DepartmentModelWriter> customWriter(){ 
     //LOGGER.info("Inside customWriter Method"); 
     return new CustomItemWriter(); 
    } 

    @Bean 
    public Job importDeptJob(JobBuilderFactory jobs, Step stepdept,JobExecutionListener listener){ 
     return jobs.get("importDeptJob") 
       .incrementer(new RunIdIncrementer()) 
       .listener(listener()) 
       .flow(stepdept).end().build(); 
    } 

    @Bean 
    public Step stepdept(ItemReader<DepartmentModelReader> deptReaderSO, 
      ItemWriter<DepartmentModelWriter> writerSO) { 
     LOGGER.info("Inside stepdept Method"); 

     return stepBuilderFactory.get("stepdept").<DepartmentModelReader, DepartmentModelWriter>chunk(5) 
       .reader(deptReaderSO).processor(processor()).writer(customWriter()).transactionManager(platformTransactionManager(dataSourceWriter)).build(); 
    } 

    @Bean 
    public JobExecutionListener listener() { 
     return new JobCompletionNotificationListener(); 
    } 

    @Bean 
    public JdbcTemplate jdbcTemplate(DataSource dataSource) { 
     return new JdbcTemplate(dataSource); 
    } 

    @Bean 
    public BatchWriteService batchWriteService() { 
     return new BatchWriteService(); 
    } 

    @Bean 
    public PlatformTransactionManager platformTransactionManager(@Qualifier("dataSourceWriter") DataSource dataSourceWriter) { 
     JpaTransactionManager transactionManager = new JpaTransactionManager(); 
     transactionManager.setDataSource(dataSourceWriter); 
     return transactionManager; 
    } 
} 



@Configuration 
@EnableBatchProcessing 
@Import({BatchScheduler.class}) 
public class CityBatchConfiguration { 

    private static final Logger LOGGER = LoggerFactory.getLogger(CityBatchConfiguration.class); 

    @Autowired 
    private SimpleJobLauncher jobLauncher; 

    @Autowired 
    public JobBuilderFactory jobBuilderFactory; 

    @Autowired 
    public StepBuilderFactory stepBuilderFactory; 

    @Autowired 
    public JobExecutionListener listener; 

    public ItemReader<CitiesModelReader> citiesReaderSO; 

    @Autowired 
    @Qualifier("dataSourceReader") 
    private DataSource dataSourceReader; 


    @Autowired 
    @Qualifier("dataSourceWriter") 
    private DataSource dataSourceWriter; 


    @Scheduled(cron = "0 0/1 * * * ?") 
    public void performSecondJob() throws Exception { 

     long startTime = System.currentTimeMillis(); 
     LOGGER.info("\n Job2 Started at :" + new Date()); 

     JobParameters param = new JobParametersBuilder().addString("JobID2",String.valueOf(System.currentTimeMillis())).toJobParameters(); 

     JobExecution execution = (JobExecution) jobLauncher.run(importCitiesJob(jobBuilderFactory,stepcity(citiesReaderSO,customCitiesWriter()),listener), param); 

     long endTime = System.currentTimeMillis(); 
     LOGGER.info("Job2 finished at " + (endTime - startTime)/1000 + " seconds with status :" + execution.getExitStatus()); 
    } 


    @Bean 
    public ItemReader<CitiesModelReader> citiesReaderSO() { 
     //LOGGER.info("Inside readerSO Method"); 
     JdbcCursorItemReader<CitiesModelReader> readerSO = new JdbcCursorItemReader<>(); 
     readerSO.setSql("SELECT CITY_CODE,CITY_NAME,PARENT_CITY,CITY_TYPE,CITY_TYPE_NAME,CREATED_G_DATE,CREATED_USER_ID,MODIFIED_G_DATE,MODIFIED_USER_ID,RECORD_ACTIVITY FROM TBL_SAMPLE_SAFTY_CITIES"); 
     readerSO.setDataSource(dataSourceReader); 
     readerSO.setRowMapper(
       (ResultSet resultSet, int rowNum) -> { 
        if (!(resultSet.isAfterLast()) && !(resultSet.isBeforeFirst())) { 
         CitiesModelReader recordSO = new CitiesModelReader(); 
         recordSO.setCityCode(resultSet.getLong("CITY_CODE")); 
         recordSO.setCityName(resultSet.getString("CITY_NAME")); 
         recordSO.setParentCity(resultSet.getInt("PARENT_CITY")); 
         recordSO.setCityType(resultSet.getString("CITY_TYPE")); 
         recordSO.setCityTypeName(resultSet.getString("CITY_TYPE_NAME")); 
         recordSO.setCreatedGDate(resultSet.getDate("CREATED_G_DATE")); 
         recordSO.setCreatedUserId(resultSet.getString("CREATED_USER_ID")); 
         recordSO.setModifiedGDate(resultSet.getDate("MODIFIED_G_DATE")); 
         recordSO.setModifiedUserId(resultSet.getString("MODIFIED_USER_ID")); 
         recordSO.setRecordActivity(resultSet.getInt("RECORD_ACTIVITY")); 

         //LOGGER.info("RowMapper record : {}", recordSO.toString()); 
         return recordSO; 
        } else { 
         LOGGER.info("Returning null from rowMapper"); 
         return null; 
        } 
       }); 
     return readerSO; 
    } 


    @Bean 
    public ItemProcessor<CitiesModelReader,CitiesModelWriter> citiesProcessor() { 
     //LOGGER.info("Inside Processor Method"); 
     return new RecordCitiesProcessor(); 
    } 


    @Bean 
    public ItemWriter<CitiesModelWriter> customCitiesWriter(){ 
     LOGGER.info("Inside customCitiesWriter Method"); 
     return new CustomCitiesWriter(); 
    }   

    @Bean 
    public Job importCitiesJob(JobBuilderFactory jobs, Step stepcity,JobExecutionListener listener) { 

     LOGGER.info("Inside importCitiesJob Method"); 
     return jobs.get("importCitiesJob") 
       .incrementer(new RunIdIncrementer()) 
       .listener(listener()) 
       .flow(stepcity).end().build(); 
    } 


    @Bean 
    public Step stepcity(ItemReader<CitiesModelReader> readerSO, 
      ItemWriter<CitiesModelWriter> writerSO) { 
     LOGGER.info("Inside stepCity Method"); 

     return stepBuilderFactory.get("stepcity").<CitiesModelReader, CitiesModelWriter>chunk(5) 
       .reader(readerSO).processor(citiesProcessor()).writer(customCitiesWriter()).transactionManager(platformTransactionManager(dataSourceWriter)).build(); 
    } 

    @Bean 
    public JobExecutionListener listener() { 
     return new JobCompletionNotificationListener(); 
    } 

    @Bean 
    public JdbcTemplate jdbcTemplate(DataSource dataSource) { 
     return new JdbcTemplate(dataSource); 
    } 

    @Bean 
    public BatchWriteService batchWriteService() { 
     return new BatchWriteService(); 
    } 

    @Bean 
    public PlatformTransactionManager platformTransactionManager(@Qualifier("dataSourceWriter") DataSource dataSourceWriter) { 
     JpaTransactionManager transactionManager = new JpaTransactionManager(); 
     transactionManager.setDataSource(dataSourceWriter); 
     return transactionManager; 
    } 
} 





@Configuration 
@EnableScheduling 
public class BatchScheduler { 

    private static final Logger LOGGER = LoggerFactory.getLogger(BatchScheduler.class); 

    @Bean 
    public ResourcelessTransactionManager resourcelessTransactionManager() { 
     return new ResourcelessTransactionManager(); 
    } 

    @Bean 
    public MapJobRepositoryFactoryBean mapJobRepositoryFactory(
      ResourcelessTransactionManager txManager) throws Exception { 

     LOGGER.info("Inside mapJobRepositoryFactory method"); 

     MapJobRepositoryFactoryBean factory = new 
       MapJobRepositoryFactoryBean(txManager); 

     factory.afterPropertiesSet(); 

     return factory; 
    } 

    @Bean 
    public JobRepository jobRepository(
      MapJobRepositoryFactoryBean factory) throws Exception { 

     LOGGER.info("Inside jobRepository method"); 

     return factory.getObject(); 
    } 

    @Bean 
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) { 

     LOGGER.info("Inside jobLauncher method"); 

     SimpleJobLauncher launcher = new SimpleJobLauncher(); 
     launcher.setJobRepository(jobRepository); 
     final SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor(); 
     launcher.setTaskExecutor(simpleAsyncTaskExecutor); 
     return launcher; 
    } 
} 
+0

1.請提供您的配置。 2.這聽起來像是讀者是有狀態的,但沒有跨步的範圍(這意味着你跨作業/步驟執行共享實例)。 –

+0

嗨,我編輯了這個問題,並添加了配置問題。你可以請檢查一次嗎? – Nithin

回答

0

你的讀者不是線程安全的,而不是一步作用域。正因爲如此,你遇到了併發問題。配置每個有狀態的ItemReader(實現ItemStream的那些類似JdbcCursorItemReader),通過添加@StepScope註釋來步進作用域,並且事情應該可以正常工作。

+0

嗨,我試圖做到這一點,但它沒有解決問題。所以,我試圖在Application類中的主方法中運行這兩個作業,而不是從DeptBatchConfiguration.java和CityBatchConfiguration.java類運行它們,而不使讀取器步進作用域併成功運行。我無法正確理解爲什麼它在我從主類運行作業時起作用。現在,我面臨着在我的主要應用程序類中安排這些作業的挑戰。如果你有任何建議,我歡迎你嗎? – Nithin