我正在編寫春季批處理作業。我正在實施自定義編寫器使用KafkaClientWriter extends AbstractItemStreamItemWriter<ProducerMessage>
如何在春季批次中爲每個作業實例啓動編寫器
我有每個實例必須是唯一的字段。但我可以看到這個班只發起一次。其餘作業有相同的作家類實例。 我的自定義閱讀器和處理器正在爲每項工作啓動。 以下是我的工作配置。我怎樣才能爲作家實現同樣的行爲?
@Bean
@Scope("job")
public ZipMultiResourceItemReader reader(@Value("#{jobParameters[fileName]}") String fileName, @Value("#{jobParameters[s3SourceFolderPrefix]}") String s3SourceFolderPrefix, @Value("#{jobParameters[timeStamp]}") long timeStamp, com.fastretailing.catalogPlatformSCMProducer.service.ConfigurationService confService) {
FlatFileItemReader faltFileReader = new FlatFileItemReader();
ZipMultiResourceItemReader zipReader = new ZipMultiResourceItemReader();
Resource[] resArray = new Resource[1];
resArray[0] = new FileSystemResource(new File(fileName));
zipReader.setArchives(resArray);
DefaultLineMapper<ProducerMessage> lineMapper = new DefaultLineMapper<ProducerMessage>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
CSVFieldMapper csvFieldMapper = new CSVFieldMapper(fileName, s3SourceFolderPrefix, timeStamp, confService);
lineMapper.setFieldSetMapper(csvFieldMapper);
faltFileReader.setLineMapper(lineMapper);
zipReader.setDelegate(faltFileReader);
return zipReader;
}
@Bean
@Scope("job")
public ItemProcessor<ProducerMessage, ProducerMessage> processor(@Value("#{jobParameters[timeStamp]}") long timeStamp) {
ProducerProcessor processor = new ProducerProcessor();
processor.setS3FileTimeStamp(timeStamp);
return processor;
}
@Bean
@ConfigurationProperties
public ItemWriter<ProducerMessage> writer() {
return new KafkaClientWriter();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
ItemReader reader, ItemWriter writer,
ItemProcessor processor, @Value("${reader.chunkSize}")
int chunkSize) {
LOGGER.info("Step configuration loaded with chunk size {}", chunkSize);
return stepBuilderFactory.get("step1")
.chunk(chunkSize).reader(reader)
.processor(processor).writer(writer)
.build();
}
@Bean
public StepScope stepScope() {
final StepScope stepScope = new StepScope();
stepScope.setAutoProxy(true);
return stepScope;
}
@Bean
public JobScope jobScope() {
final JobScope jobScope = new JobScope();
return jobScope;
}
@Bean
public Configuration configuration() {
return new Configuration();
}
我試着讓作家的作業範圍。但是在這種情況下,open並沒有被調用。這是我正在做一些初始化的地方。
將返回類型更改爲'KafkaClientWriter'並添加@Scope(「job」)'。 –
@M。 Denium:很好。謝謝,它工作。現在打開並關閉正確調用。爲什麼'@Scope(「job」)'返回類型爲'ItemWriter'沒有工作?我正在從寫入方法獲得一些自定義字段,本應從open打開。 –