2015-09-18 29 views
1

我使用Spring Batch將源表克隆到源數據庫。該作業是使用帶有傳遞參數的jobLauncher從服務層手動啓動的。我應該如何使用.tasklet()/ .chunk()成功完成作業?

一切都很好,但在步驟描述中使用當前配置(下面)與.chunk(10)我只有10行被克隆,並且Caused by: java.sql.SQLException: Result set already closed異常。

如何正確描述步驟,完成read->將整個表從源文件寫入目標數據庫?

@Configuration 
@EnableBatchProcessing 
public class DatasetProcessingContext { 

    private static final String OVERRIDEN_BY_JOB_PARAMETER = null; 
    private static final String DATASET_PROCESSING_STEP = "datasetProcessingStep"; 
    private static final String DATASET_PROCESSING_JOB = "datasetProcessingJob"; 

    public static final String SUBSYSTEM = "subsystem"; 
    public static final String SQL = "sql"; 
    public static final String SOURCE_DATASOURCE = "sourceDatasource"; 
    public static final String INSERT_QUERY = "insertQuery"; 
    public static final String TARGET_DATASOURCE = "targetDatasource"; 

    @Autowired 
    @Qualifier(DEV_DATA_SOURCE) 
    private DataSource devDataSource; 

    //set of datasources 

    @Autowired 
    private PlatformTransactionManager transactionManager; 

    @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") 
    @Autowired 
    private Map<String, TableMessageDataRowMapper> tableMessageDataRowMappers; 

    @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") 
    @Autowired 
    private Map<String, TableMessageDataPreparedStatementSetter> messageDataPreparedStatementSetters; 

    @Autowired 
    private JobBuilderFactory jobsFactory; 

    @Autowired 
    private StepBuilderFactory stepsFactory; 

    @Bean 
    public JobRepository jobRepository() throws Exception { 
     return new MapJobRepositoryFactoryBean(transactionManager).getObject(); 
    } 

    @Bean 
    public JobRegistry jobRegistry() { 
     return new MapJobRegistry(); 
    } 

    @Bean 
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() { 
     JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor(); 
     postProcessor.setJobRegistry(jobRegistry()); 
     return postProcessor; 
    } 

    @Bean 
    public JobLauncher jobLauncher() throws Exception { 
     SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); 
     jobLauncher.setJobRepository(jobRepository()); 
     return jobLauncher; 
    } 

    @Bean 
    public static StepScope stepScope() { 
     return new StepScope(); 
    } 

    @Bean 
    @SuppressWarnings("unchecked") 
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES) 
    public ItemStreamReader jdbcReader(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem, 
             @Value("#{jobParameters['" + SQL + "']}") String sql, 
             @Value("#{jobParameters['" + SOURCE_DATASOURCE + "']}") String sourceDatasource) { 

     JdbcCursorItemReader jdbcCursorItemReader = new JdbcCursorItemReader(); 
     jdbcCursorItemReader.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(sourceDatasource))); 
     jdbcCursorItemReader.setSql(sql); 
     jdbcCursorItemReader.setRowMapper((RowMapper) tableMessageDataRowMappers 
       .get(subsystem + TABLE_MESSAGE_DATA_ROW_MAPPER)); 

     return jdbcCursorItemReader; 
    } 

    @Bean 
    @SuppressWarnings("unchecked") 
    @Scope(value = "step", proxyMode = ScopedProxyMode.INTERFACES) 
    public ItemWriter jdbcWriter(@Value("#{jobParameters['" + SUBSYSTEM + "']}") String subsystem, 
           @Value("#{jobParameters['" + INSERT_QUERY + "']}") String insertQuery, 
           @Value("#{jobParameters['" + TARGET_DATASOURCE + "']}") String targetDatasource) { 

     JdbcBatchItemWriter jdbcWriter = new JdbcBatchItemWriter(); 
     jdbcWriter.setDataSource(getDataSourceFromEnum(TargetDataSource.valueOf(targetDatasource))); 
     jdbcWriter.setSql(insertQuery); 
     jdbcWriter.setItemPreparedStatementSetter(messageDataPreparedStatementSetters 
       .get(subsystem + TABLE_MESSAGE_DATA_PREPARED_STATEMENT_SETTER)); 

     return jdbcWriter; 
    } 

    @Bean 
    @SuppressWarnings("unchecked") 
    public Step datasetProcessingStep() { 

     return stepsFactory.get(DATASET_PROCESSING_STEP) 
       // should I create Tasklet or chunk with some CompletionPolicy? 
       .chunk(10) 
       .reader(jdbcReader(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER)) 
       .writer(jdbcWriter(OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER, OVERRIDEN_BY_JOB_PARAMETER)) 
       .allowStartIfComplete(true) 
       .build(); 
    } 

    @Bean 
    public Job datasetProcessingJob() { 

     return jobsFactory.get(DATASET_PROCESSING_JOB).start(datasetProcessingStep()).build(); 
    } 

回答

1

在步驟描述中使用.chunk(new DefaultResultCompletionPolicy())適合我的情況。如果返回null結果,則此政策從isComplete(RepeatContext context, RepeatStatus result)返回true - 比ResultSet結束。