2016-08-23 113 views
1

在Spring批處理中,我試圖讀取一個CSV文件,並希望將每行分配給一個單獨的線程並對其進行處理。我試圖通過使用任務執行程序來實現它,它正在工作,如果我沒有使用作業參數獲取文件名。如果我從scope="step"以後通過作業參數,所有線程正在從文件中讀取相同的行。如果我改變了scope="job",如果是的話請建議方式嗎?目前,我下面得到一個錯誤:spring批處理多線程文件讀取

產生的原因:java.lang.IllegalStateException:沒有適用範圍適用於範圍名 '工作'

請幫助註冊...

查找下面的Job.xml

<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch"  restartable="true"> 
    <step id="step" allow-start-if-complete="true"> 
     <partition step="step2" partitioner="partitioner"> 
      <handler grid-size="3" task-executor="taskExecutor" /> 
     </partition> 
    </step> 
</job> 

    <bean id="partitioner" class="com.range.part.RangePartitioner"> 
</bean> 

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" /> 

<step id="step2" xmlns="http://www.springframework.org/schema/batch"> 
    <tasklet transaction-manager="transactionManager"> 
     <chunk reader="itemReader" writer="cutomitemWriter" processor="itemProcessor" commit-interval="100" /> 
    </tasklet> 
</step> 
<bean id="itemProcessor" class="com.range.processor.UserProcessor" scope="step"> 
<property name="threadName" value="#{stepExecutionContext[name]}"/> 
</bean> 

<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="job"> 
<property name="resource" value="file:#{jobParameters[file]}"> 
</property>  
    <!-- <property name="linesToSkip" value="1"/> --> 
<property name="lineMapper"> 
     <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> 
      <property name="lineTokenizer"> 
       <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> 
        <property name="delimiter" value="," /> 
        <!-- <property name="names" value="transactionBranch,batchEntryDate,batchNo,channelID,CountryCode" />--> 
     </bean> 
      </property> 
      <property name="fieldSetMapper"> 
       <bean class="com.fieldset.FieldsetMapper"> 

       </bean> 
      </property> 
     </bean> 
    </property> 
    </bean> 

<bean id="cutomitemWriter" class="com.range.processor.customitemWritter"> 
</bean> 
+1

提供的信息不夠全面。你需要發佈一些代碼。 – marthursson

+0

看起來你正在讀取每個線程內的文件而不是外部線程。 –

+0

你是否想讓它們在不同的線程上讀取(不一定是最高性能的選項),或者只是在不同的線程上處理? –

回答

1

我想我們可以在其上使用分區器的方式。在分區級別,我們可以讀取文件(通過使用任何CSV閱讀器或Spring Reader也可以),然後處理每一行。

每一行都會被添加到分區程序的隊列(Map)中,以便達到您的要求。

我在這裏的代碼貼出來供大家參考

公共類LinePartitioner實現分區程序{

@Value("#{jobParameters['fileName']}") 
private String fileName; 

Map<String, ExecutionContext> queue = new HashMap<>(); 

@Override 
public Map<String, ExecutionContext> partition(int gridSize) { 

    BufferedReader reader = new BufferedReader(new FileReader(this.fileName)); 
    List<String> lines = new ArrayList<>(); 
    int count = 0; 
    while ((line = reader.readLine()) != null) { 

     ExecutionContext value = new ExecutionContext(); 
     value.put("lineContent", line); 
     value.put("lineCount", count+1); 

     queue.put(++count, value); 
    } 

    return queue; 
} 

}

正如上面的代碼,你可以通過任何一個CSV閱讀器或Spring Reader來替代閱讀器用Pojo對象簡化映射場。

請讓我知道如果您需要完整的程序,我會爲您編寫和上傳。

感謝, Nghia酒店

- 用一個例子有1000個項目讀者建立分區程序讀者

@Override 
    public Map<String, ExecutionContext> partition(int gridSize) { 
     try { 
      Map<String, ExecutionContext> queue = new HashMap<>(); 

      List<List<String>> trunks = new ArrayList<>(); 

      // read and store data to a list of trunk 
      int chunkSize = 1000; 
      int count = 1; 
      try (BufferedReader br = new BufferedReader(new FileReader("your file"))) { 
       String line; 
       List items = null; 
       while ((line = br.readLine()) != null) { 
        if (count % chunkSize == 0) { 
         items = new ArrayList(); 
         trunks.add(items); 
        } 

        items.add(line); 
       } 
      } 

      // add to queue to start prorcessing 
      for (int i=0; i<trunks.size(); i++) { 
       ExecutionContext value = new ExecutionContext(); 
       value.put("items", trunks.get(i)); 
       queue.put("trunk"+i, value); 
      } 

      return queue; 
     } 

     catch (Exception e) { 
      // handle exception 
     } 
} 
+0

嗨,謝謝你soo ..我已經開始實施你的方法..一旦完成,我會讓你知道,如果我需要任何幫助.. 。並嘗試另一種方法也.. – gautam

+0

我已經將原始文件拆分爲多個文件,並使用MultiFileResourcePartitioner分配給每個線程的單個文件。請確認哪種方法可以帶來更多的性能優勢......預先感謝 – gautam

+0

就你而言,由於文件讀取在分區類本身中得到了關注,我可以直接處理通過ExceutionContext進行的記錄嗎?另一個單獨的讀者不是正確的? – gautam

0

更新您可以看到this example (on Github)與多線程任務導入一個很大的CSV文件(如200,000行)插入數據庫並將數據從DB導出到JSON文件(FileReader和FileWriter將具有無線程安全)。

<batch:job id="transformJob"> 
    <batch:step id="deleteDir" next="cleanDB"> 
     <batch:tasklet ref="fileDeletingTasklet" /> 
    </batch:step> 
    <batch:step id="cleanDB" next="countThread"> 
     <batch:tasklet ref="cleanDBTasklet" /> 
    </batch:step> 
    <batch:step id="countThread" next="split"> 
     <batch:tasklet ref="countThreadTasklet" /> 
    </batch:step> 
    <batch:step id="split" next="partitionerMasterImporter"> 
     <batch:tasklet> 
      <batch:chunk reader="largeCSVReader" writer="smallCSVWriter" 
       commit-interval="#{jobExecutionContext['chunk.count']}" /> 
     </batch:tasklet> 
    </batch:step> 
    <batch:step id="partitionerMasterImporter" next="partitionerMasterExporter"> 
     <partition step="importChunked" partitioner="filePartitioner"> 
      <handler grid-size="10" task-executor="taskExecutor" /> 
     </partition> 
    </batch:step> 
    <batch:step id="partitionerMasterExporter" next="concat"> 
     <partition step="exportChunked" partitioner="dbPartitioner"> 
      <handler grid-size="10" task-executor="taskExecutor" /> 
     </partition> 
    </batch:step> 
    <batch:step id="concat"> 
     <batch:tasklet ref="concatFileTasklet" /> 
    </batch:step> 
</batch:job> 

<batch:step id="importChunked"> 
    <batch:tasklet> 
     <batch:chunk reader="smallCSVFileReader" writer="dbWriter" 
      processor="importProcessor" commit-interval="500"> 
     </batch:chunk> 
    </batch:tasklet> 
</batch:step> 

<batch:step id="exportChunked"> 
    <batch:tasklet> 
     <batch:chunk reader="dbReader" writer="jsonFileWriter" 
      processor="exportProcessor" commit-interval="#{jobExecutionContext['chunk.count']}"> 
     </batch:chunk> 
    </batch:tasklet> 
</batch:step> 

<bean id="jsonFileWriter" class="com.batch.writer.PersonWriterToFile" 
    scope="step"> 
    <property name="outputPath" value="csv/chunked/paged-#{stepExecutionContext[page]}.json" /> 
</bean> 

<bean id="dbReader" class="com.batch.reader.PersonReaderFromDataBase" scope="step"> 
    <property name="iPersonRepository" ref="IPersonRepository" /> 
    <property name="page" value="#{stepExecutionContext[page]}"/> 
    <property name="size" value="#{stepExecutionContext[size]}"/> 
</bean> 

<bean id="countThreadTasklet" class="com.batch.tasklet.CountingTasklet" 
    scope="step"> 
    <property name="input" value="file:csv/input/#{jobParameters[filename]}" /> 
</bean> 

<bean id="cleanDBTasklet" class="com.batch.tasklet.CleanDBTasklet" /> 

<bean id="fileDeletingTasklet" class="com.batch.tasklet.FileDeletingTasklet"> 
    <property name="directory" value="file:csv/chunked/" /> 
</bean> 

<bean id="concatFileTasklet" class="com.batch.tasklet.FileConcatTasklet"> 
    <property name="directory" value="file:csv/chunked/" /> 
    <property name="outputFilename" value="csv/output/export.json" /> 
</bean> 

<bean id="filePartitioner" class="com.batch.partitioner.FilePartitioner"> 
    <property name="outputPath" value="csv/chunked/" /> 
</bean> 

<bean id="dbPartitioner" class="com.batch.partitioner.DBPartitioner" scope="step"> 
    <property name="pageSize" value="#{jobExecutionContext['chunk.count']}" /> 
</bean> 

<bean id="largeCSVReader" class="com.batch.reader.LineReaderFromFile" 
    scope="step"> 
    <property name="inputPath" value="csv/input/#{jobParameters[filename]}" /> 
</bean> 

<bean id="smallCSVWriter" class="com.batch.writer.LineWriterToFile" 
    scope="step"> 
    <property name="outputPath" value="csv/chunked/"></property> 
</bean> 

<bean id="smallCSVFileReader" class="com.batch.reader.PersonReaderFromFile" 
    scope="step"> 
    <constructor-arg value="csv/chunked/#{stepExecutionContext[file]}" /> 
</bean> 

<bean id="importProcessor" class="com.batch.processor.ImportPersonItemProcessor" /> 

<bean id="exportProcessor" class="com.batch.processor.ExportPersonItemProcessor" /> 

<bean id="dbWriter" class="com.batch.writer.PersonWriterToDataBase"> 
    <property name="iPersonRepository" ref="IPersonRepository" /> 
</bean> 

在這兩種情況下,partionner用於拼接成10個文件(每個線程一個文件)的導入和導出10個文件(每個線程過一個文件),那麼我們串接所有有一個文件。

希望得到這個幫助。