2017-08-08 62 views
1

我有一個現有的spring批處理項目,它根據啓動過程中的功能切換決定從MySQL或ArangoDB(NoSql數據庫)中讀取數據,並執行一些處理並再次寫回到MySQL/ArangoDB。Spring針對不同數據庫的批處理多個讀取器

現在MySQL的讀寫器配置是一樣的東西下面,

@Bean 
@Primary 
@StepScope 
public HibernatePagingItemReader reader(
     @Value("#{jobParameters[oldMetadataDefinitionId]}") Long oldMetadataDefinitionId) { 

    Map<String, Object> queryParameters = new HashMap<>(); 
    queryParameters.put(Constants.OLD_METADATA_DEFINITION_ID, oldMetadataDefinitionId); 


    HibernatePagingItemReader<Long> reader = new HibernatePagingItemReader<>(); 

    reader.setUseStatelessSession(false); 
    reader.setPageSize(250); 
    reader.setParameterValues(queryParameters); 

    reader.setSessionFactory(((HibernateEntityManagerFactory) entityManagerFactory.getObject()).getSessionFactory()); 
    return reader; 
} 

和我還有一個阿朗戈讀者像下面,

@Bean 
@StepScope 
public ListItemReader arangoReader(
     @Value("#{jobParameters[oldMetadataDefinitionId]}") Long oldMetadataDefinitionId) { 

    List<InstanceDTO> instanceList = new ArrayList<InstanceDTO>(); 

    PersistenceService arangoPersistence = arangoConfiguration 
      .getPersistenceService()); 

    List<Long> instanceIds = arangoPersistence.getDefinitionInstanceIds(oldMetadataDefinitionId); 

    instanceIds.forEach((instanceId) -> 
    { 
     InstanceDTO instanceDto = new InstanceDTO(); 
     instanceDto.setDefinitionID(oldMetadataDefinitionId); 
     instanceDto.setInstanceID(instanceId); 
     instanceList.add(instanceDto); 

    }); 

    return new ListItemReader(instanceList); 

} 

和我的步設置爲以下,

@Bean 
@SuppressWarnings("unchecked") 
public Step InstanceMergeStep(ListItemReader arangoReader, ItemWriter<MetadataInstanceDTO> arangoWriter, 
     ItemReader<Long> mysqlReader, ItemWriter<Long> mysqlWriter) { 

    Step step = null; 
    if (arangoUsage) { 
     step = steps.get("arangoInstanceMergeStep") 


       .<Long, Long>chunk(1) 

       .reader(arangoReader) 

       .writer(arangoWriter) 


       .faultTolerant() 


       .skip(Exception.class) 


       .skipLimit(10) 

       .taskExecutor(stepTaskExecutor()) 

       .build(); 


     ((TaskletStep) step).registerChunkListener(chunkListener); 
    } 
    else { 
     step = steps.get("mysqlInstanceMergeStep") 


       .<Long, Long>chunk(1) 

       .reader(mysqlReader) 

       .writer(mysqlWriter) 

       .faultTolerant() 

       .skip(Exception.class) 


       .skipLimit(failedSkipLimit) 

       .taskExecutor(stepTaskExecutor()) 

       .build(); 


     ((TaskletStep) step).registerChunkListener(chunkListener); 

    } 

    return step; 
} 

MySQL讀取器通過HibernatePagingItemReader支持分頁支持,以便它可以處理e數百萬項目沒有任何內存問題。

我想實現相同的分頁支持arango閱讀器每次迭代只能獲取250個文檔如何修改arango閱讀器代碼來實現這個目標?

回答

0

首先ListItemReader的文檔說 - 用於測試所以不要用它來生產。代替實際的具體類型,而是從您的所有閱讀器Bean返回一個ItemReader

話雖如此,Spring Batch API或Spring Data似乎並不支持Arango DB。最近我能找到的是this

(我之前沒有和Arango DB合作過)。

所以,在我看來,你必須寫,通過可能實現的抽象類實現分頁自己的自定義阿朗戈閱讀器 - 通過上面的類擴展org.springframework.batch.item.database.AbstractPagingItemReader

如果它不是可行的,您可能需要實施一切從頭開始。 Spring Batch API中的所有分頁閱讀器擴展了這個抽象類,包括HibernatePagingItemReader

另外請記住,阿朗戈記錄集應該有某種排序來實現分頁,所以我們可以將網頁的區別的 - 0 &頁-1等(類似ORDER BY條款,BETWEEN操作&小於,運營商等更SQL。同樣需要FETCH FIRST XXX ROWSLIMIT子類的東西)。

由您自己執行並不是一項非常艱鉅的任務,因爲您必須計算總可能的項目,然後對它們進行排序,然後分頁並一次只提取一頁。

看看API的實現像 - HibernatePagingItemReader等來獲取想法。

希望它有幫助!

+0

謝謝@sabir以上澄清! ,是否可以使用動態startIndex和endIndex多次調用閱讀器arangoMergeStep,以僅使用spring引導爲AQL查詢提取有限記錄。 – siva

+0

它是由框架重複調用的reader的read()方法。這個'read()'方法應該一直返回一個項目,直到頁面完成,然後獲取下一頁。你爲最後一頁的最後一項返回'null'。看看現有的類之一,例如'JdbcPagingItemReader'或'HibernatePagingItemReader'等。這將是您的自定義讀者的工作來維護這些項目和計數,因爲每次調用讀取時合同都會返回一個項目。 –

相關問題