我有一個現有的spring批處理項目,它根據啓動過程中的功能切換決定從MySQL或ArangoDB(NoSql數據庫)中讀取數據,並執行一些處理並再次寫回到MySQL/ArangoDB。Spring針對不同數據庫的批處理多個讀取器
現在MySQL的讀寫器配置是一樣的東西下面,
@Bean
@Primary
@StepScope
public HibernatePagingItemReader reader(
@Value("#{jobParameters[oldMetadataDefinitionId]}") Long oldMetadataDefinitionId) {
Map<String, Object> queryParameters = new HashMap<>();
queryParameters.put(Constants.OLD_METADATA_DEFINITION_ID, oldMetadataDefinitionId);
HibernatePagingItemReader<Long> reader = new HibernatePagingItemReader<>();
reader.setUseStatelessSession(false);
reader.setPageSize(250);
reader.setParameterValues(queryParameters);
reader.setSessionFactory(((HibernateEntityManagerFactory) entityManagerFactory.getObject()).getSessionFactory());
return reader;
}
和我還有一個阿朗戈讀者像下面,
@Bean
@StepScope
public ListItemReader arangoReader(
@Value("#{jobParameters[oldMetadataDefinitionId]}") Long oldMetadataDefinitionId) {
List<InstanceDTO> instanceList = new ArrayList<InstanceDTO>();
PersistenceService arangoPersistence = arangoConfiguration
.getPersistenceService());
List<Long> instanceIds = arangoPersistence.getDefinitionInstanceIds(oldMetadataDefinitionId);
instanceIds.forEach((instanceId) ->
{
InstanceDTO instanceDto = new InstanceDTO();
instanceDto.setDefinitionID(oldMetadataDefinitionId);
instanceDto.setInstanceID(instanceId);
instanceList.add(instanceDto);
});
return new ListItemReader(instanceList);
}
和我的步設置爲以下,
@Bean
@SuppressWarnings("unchecked")
public Step InstanceMergeStep(ListItemReader arangoReader, ItemWriter<MetadataInstanceDTO> arangoWriter,
ItemReader<Long> mysqlReader, ItemWriter<Long> mysqlWriter) {
Step step = null;
if (arangoUsage) {
step = steps.get("arangoInstanceMergeStep")
.<Long, Long>chunk(1)
.reader(arangoReader)
.writer(arangoWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(10)
.taskExecutor(stepTaskExecutor())
.build();
((TaskletStep) step).registerChunkListener(chunkListener);
}
else {
step = steps.get("mysqlInstanceMergeStep")
.<Long, Long>chunk(1)
.reader(mysqlReader)
.writer(mysqlWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(failedSkipLimit)
.taskExecutor(stepTaskExecutor())
.build();
((TaskletStep) step).registerChunkListener(chunkListener);
}
return step;
}
MySQL讀取器通過HibernatePagingItemReader支持分頁支持,以便它可以處理e數百萬項目沒有任何內存問題。
我想實現相同的分頁支持arango閱讀器每次迭代只能獲取250個文檔如何修改arango閱讀器代碼來實現這個目標?
謝謝@sabir以上澄清! ,是否可以使用動態startIndex和endIndex多次調用閱讀器arangoMergeStep,以僅使用spring引導爲AQL查詢提取有限記錄。 – siva
它是由框架重複調用的reader的read()方法。這個'read()'方法應該一直返回一個項目,直到頁面完成,然後獲取下一頁。你爲最後一頁的最後一項返回'null'。看看現有的類之一,例如'JdbcPagingItemReader'或'HibernatePagingItemReader'等。這將是您的自定義讀者的工作來維護這些項目和計數,因爲每次調用讀取時合同都會返回一個項目。 –