2016-10-05 55 views
0

我有一個Spring批處理作業,包含分區步驟和分區步驟正在處理塊。從Spring Writer啓動Runnable批處理分區步驟

我可以從方法public void write(List<? extends VO> itemsToWrite)進一步啓動新線程(執行Runnable)嗎?

基本上,筆者寫到這裏使用Lucene索引和自作家有chunk-sizeList項目,我認爲除以List成段,每段傳遞到一個新的Runnable

這是一個很好的方法嗎?

我編寫了一個示例,它工作大部分時間,但卡住了幾次。

有什麼我需要擔心的嗎?或者有沒有什麼內置的春季批次來實現這一目標?

我不想寫單個線程發生的整個塊。我希望進一步劃分大塊。

Lucene的IndexWriter是線程安全的一個方法是上市here

示例代碼 - 作家將獲取我打開從線程池中的線程項目的List?即使我等待游泳池終止爲一個塊,是否會有任何顧慮?

@Override 
    public void write(List<? extends IndexerInputVO> inputItems) throws Exception { 


     int docsPerThread = Constants.NUMBER_OF_DOCS_PER_INDEX_WRITER_THREADS; 
     int docSize = inputItems.size(); 
     int remainder = docSize%docsPerThread; 
     int poolSize = docSize/docsPerThread; 

     ExecutorService executor = Executors.newFixedThreadPool(poolSize+1); 


     int fromIndex=0; 
     int toIndex = docsPerThread; 

     if(docSize < docsPerThread){ 
      executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems)); 
     }else{ 
      for(int i=1;i<=poolSize;i++){ 
       executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex))); 
       fromIndex+=docsPerThread; 
       toIndex+=docsPerThread; 
      } 

      if(remainder != 0){ 
       toIndex=docSize; 
       executor.submit(new IndexWriterRunnable(this.luceneObjects,service,inputItems.subList(fromIndex, toIndex))); 
      } 
     } 

     executor.shutdown(); 

     while(executor.isTerminated()){ 
      ; 
     } 

回答

0

我不確定在作者中啓動新線程是個好主意。 這些線程超出了Spring批處理框架的範圍,因此您需要爲上述實現關閉和取消策略。如果一個段的處理失敗,則可能導致整個隊列失敗。

作爲替代方法,我可以建議您將自定義的作家列表從作家推薦到下一步,如官方文檔中所述passingDataToFutureSteps

相關問題