2016-03-23 27 views
0

我正在編寫春季批處理作業。我正在實施自定義編寫器使用KafkaClientWriter extends AbstractItemStreamItemWriter<ProducerMessage>如何在春季批次中爲每個作業實例啓動編寫器

我有每個實例必須是唯一的字段。但我可以看到這個班只發起一次。其餘作業有相同的作家類實例。 我的自定義閱讀器和處理器正在爲每項工作啓動。 以下是我的工作配置。我怎樣才能爲作家實現同樣的行爲?

@Bean 
     @Scope("job") 
     public ZipMultiResourceItemReader reader(@Value("#{jobParameters[fileName]}") String fileName, @Value("#{jobParameters[s3SourceFolderPrefix]}") String s3SourceFolderPrefix, @Value("#{jobParameters[timeStamp]}") long timeStamp, com.fastretailing.catalogPlatformSCMProducer.service.ConfigurationService confService) { 
      FlatFileItemReader faltFileReader = new FlatFileItemReader(); 
      ZipMultiResourceItemReader zipReader = new ZipMultiResourceItemReader(); 
      Resource[] resArray = new Resource[1]; 
      resArray[0] = new FileSystemResource(new File(fileName)); 
      zipReader.setArchives(resArray); 
      DefaultLineMapper<ProducerMessage> lineMapper = new DefaultLineMapper<ProducerMessage>(); 
      lineMapper.setLineTokenizer(new DelimitedLineTokenizer()); 
      CSVFieldMapper csvFieldMapper = new CSVFieldMapper(fileName, s3SourceFolderPrefix, timeStamp, confService); 
      lineMapper.setFieldSetMapper(csvFieldMapper); 
      faltFileReader.setLineMapper(lineMapper); 
      zipReader.setDelegate(faltFileReader); 
      return zipReader; 
     } 

     @Bean 
     @Scope("job") 
     public ItemProcessor<ProducerMessage, ProducerMessage> processor(@Value("#{jobParameters[timeStamp]}") long timeStamp) { 
      ProducerProcessor processor = new ProducerProcessor(); 
      processor.setS3FileTimeStamp(timeStamp); 
      return processor; 
     } 

     @Bean 
     @ConfigurationProperties 
     public ItemWriter<ProducerMessage> writer() { 
      return new KafkaClientWriter(); 
     } 

     @Bean 
     public Step step1(StepBuilderFactory stepBuilderFactory, 
          ItemReader reader, ItemWriter writer, 
          ItemProcessor processor, @Value("${reader.chunkSize}") 
          int chunkSize) { 
      LOGGER.info("Step configuration loaded with chunk size {}", chunkSize); 
      return stepBuilderFactory.get("step1") 
        .chunk(chunkSize).reader(reader) 
        .processor(processor).writer(writer) 
        .build(); 
     } 

     @Bean 
     public StepScope stepScope() { 
      final StepScope stepScope = new StepScope(); 
      stepScope.setAutoProxy(true); 
      return stepScope; 
     } 

     @Bean 
     public JobScope jobScope() { 
      final JobScope jobScope = new JobScope(); 
      return jobScope; 
     } 

     @Bean 
     public Configuration configuration() { 
      return new Configuration(); 
     } 

我試着讓作家的作業範圍。但是在這種情況下,open並沒有被調用。這是我正在做一些初始化的地方。

+0

將返回類型更改爲'KafkaClientWriter'並添加@Scope(「job」)'。 –

+0

@M。 Denium:很好。謝謝,它工作。現在打開並關閉正確調用。爲什麼'@Scope(「job」)'返回類型爲'ItemWriter '沒有工作?我正在從寫入方法獲得一些自定義字段,本應從open打開。 –

回答

0

當使用基於java的配置和作用域代理時,會發生什麼情況就是檢測到該方法的返回類型,並且創建了代理。因此,當您返回ItemWriter時,您將得到一個僅實現ItemWriter的JDK代理,而您的open方法位於ItemStream接口上。由於該接口未包含在代理中,因此無法調用該方法。

將返回類型更改爲KafkaClientWriterItemStreamWriter< ProducerMessage>(假設KafkaCLientWriter實現該方法)。接下來添加@Scope("job"),您應該再次使用適當範圍的作者調用open方法。