2014-01-22 53 views
4

在Spring批處理中我試圖讀取一個CSV文件,並希望將每行分配給一個單獨的線程並對其進行處理。我試圖通過使用TaskExecutor來實現它,但是所有線程正在發生的事情是一次選取同一行。我也嘗試使用Partioner實現這個概念,同樣的事情也發生了。請參閱下面我的配置Xml。使用Spring批處理文件的多線程文件項讀取器

步驟說明

<step id="Step2"> 
     <tasklet task-executor="taskExecutor"> 
      <chunk reader="reader" processor="processor" writer="writer" commit-interval="1" skip-limit="1"> 
      </chunk> 
     </tasklet> 
    </step> 

       <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader"> 
<property name="resource" value="file:cvs/user.csv" /> 

<property name="lineMapper"> 
    <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> 
     <!-- split it --> 
     <property name="lineTokenizer"> 
      <bean 
      class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> 
      <property name="names" value="userid,customerId,ssoId,flag1,flag2" /> 
     </bean> 
     </property> 
     <property name="fieldSetMapper"> 

      <!-- map to an object --> 
      <bean 
      class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"> 
      <property name="prototypeBeanName" value="user" /> 
      </bean>   
     </property> 

     </bean> 
    </property> 

     </bean> 

     <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"> 
<property name="concurrencyLimit" value="4"/> 

我曾嘗試與不同類型的任務執行的,但是所有的人都在相同的行爲方式。我怎樣才能將每一行分配給一個單獨的線程?

+0

你可以參考這個http:// stackoverflow。COM /問題/ 20243629 /如何對增加 - - 性能 - 的 - flatfileitemreader功能於springbatch的/ 20261835#20261835 –

回答

6

FlatFileItemReader不是線程安全的。在您的示例中,您可以嘗試將CS​​V文件拆分爲較小的CSV文件,然後使用MultiResourcePartitioner來處理它們中的每一個。這可以分兩步完成,一個用於分割原始文件(如10個較小的文件),另一個用於處理分割的文件。這樣您就不會有任何問題,因爲每個文件都將由一個線程處理。

例子:

<batch:job id="csvsplitandprocess"> 
    <batch:step id="step1" next="step2master"> 
    <batch:tasklet> 
     <batch:chunk reader="largecsvreader" writer="csvwriter" commit-interval="500"> 
     </batch:chunk> 
    </batch:tasklet> 
    </batch:step> 
    <batch:step id="step2master"> 
    <partition step="step2" partitioner="partitioner"> 
     <handler grid-size="10" task-executor="taskExecutor"/> 
    </partition> 
</batch:step> 
</batch:job> 

<batch:step id="step2"> 
    <batch:tasklet> 
     <batch:chunk reader="smallcsvreader" writer="writer" commit-interval="100"> 
     </batch:chunk> 
    </batch:tasklet> 
</batch:step> 


<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
      <property name="corePoolSize" value="10" /> 
      <property name="maxPoolSize" value="10" /> 
    </bean> 

<bean id="partitioner" 
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"> 
<property name="resources" value="file:cvs/extracted/*.csv" /> 
</bean> 

的替代,而不是劃分可能是一個自定義線程安全的讀者誰將會創建一個線程的每一行,但可能分區是您最佳選擇

+0

是的......我意識到,我必須去這兩種選擇的......但哪一個明智的表現會更好嗎? – slowhandblues

+0

絕對分割,這就是因爲一個定製的閱讀器將line.On仍然工藝管線另一方面,許多較小的CSV文件將同時procesed(分區步).Keep記住,有不同的縮放techinques像優化性能等諸多因素玩提交間隔,跳過和重試策略,通常每個案例都有它自己的瓶頸。希望有所幫助! – dimzak

+0

好,在遠程分區中使用MultiThreadedFlatFileItemReader是否安全? https://github.com/sshcherbakov/spring-batch-talk/blob/master/src/main/java/org/springframework/batch/item/file/MultiThreadedFlatFileItemReader.java – vishal

1

你的問題是你的讀者不在範圍內。

這意味着:您的所有線程共享相同的輸入流(資源文件)。

要爲每個線程一行來處理,你需要:

  1. 確保所有線程讀取從開始的文件,以文件的 結束(每個線程都應該打開流並關閉對於每個執行上下文的 )
  2. 分區程序必須爲每個執行上下文注入開始和結束位置。
  3. 你的閱讀器必須閱讀與這個職位的文件。

我寫一些代碼,這是輸出:

com.test.partitioner.RangePartitioner類代碼:

public Map<String, ExecutionContext> partition() { 

    Map < String, ExecutionContext > result = new HashMap < String, ExecutionContext >(); 

    int range = 1; 
    int fromId = 1; 
    int toId = range; 

    for (int i = 1; i <= gridSize; i++) { 
     ExecutionContext value = new ExecutionContext(); 

     log.debug("\nStarting : Thread" + i); 
     log.debug("fromId : " + fromId); 
     log.debug("toId : " + toId); 

     value.putInt("fromId", fromId); 
     value.putInt("toId", toId); 

     // give each thread a name, thread 1,2,3 
     value.putString("name", "Thread" + i); 

     result.put("partition" + i, value); 

     fromId = toId + 1; 
     toId += range; 

    } 

    return result; 
} 

- >查看輸出控制檯

開始:線程1 fromId: 1 toId:1

Starting:Thread2 fromId:2 TOID:2

開始:Thread3 fromId:3 TOID:3

開始:Thread4 fromId:4 TOID:4

開始:Thread5 fromId:5 toId:5

Starting:Thread6 fromId:6 toId:6

開始:Thread7 fromId:7 TOID:7

開始:Thread8 fromId:8 TOID:8

開始:Thread9 fromId:9 TOID:9

啓動:Thread10 fromId:10 toId:10

看看配置波紋管:

http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd「>

<import resource="../config/context.xml" /> 
<import resource="../config/database.xml" /> 

<bean id="mouvement" class="com.test.model.Mouvement" scope="prototype" /> 

<bean id="itemProcessor" class="com.test.processor.CustomItemProcessor" scope="step"> 
    <property name="threadName" value="#{stepExecutionContext[name]}" /> 
</bean> 
<bean id="xmlItemWriter" class="com.test.writer.ItemWriter" /> 

<batch:job id="mouvementImport" xmlns:batch="http://www.springframework.org/schema/batch"> 
    <batch:listeners> 
     <batch:listener ref="myAppJobExecutionListener" /> 
    </batch:listeners> 

    <batch:step id="masterStep"> 
     <batch:partition step="slave" partitioner="rangePartitioner"> 
      <batch:handler grid-size="10" task-executor="taskExecutor" /> 
     </batch:partition> 
    </batch:step> 
</batch:job> 

<bean id="rangePartitioner" class="com.test.partitioner.RangePartitioner" /> 

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" /> 

<batch:step id="slave"> 
    <batch:tasklet> 
     <batch:listeners> 
      <batch:listener ref="stepExecutionListener" /> 
     </batch:listeners> 

     <batch:chunk reader="mouvementReader" writer="xmlItemWriter" processor="itemProcessor" commit-interval="1"> 
     </batch:chunk> 

    </batch:tasklet> 
</batch:step> 



<bean id="stepExecutionListener" class="com.test.listener.step.StepExecutionListenerCtxInjecter" scope="step" /> 

<bean id="myAppJobExecutionListener" class="com.test.listener.job.MyAppJobExecutionListener" /> 

<bean id="mouvementReaderParent" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"> 

    <property name="resource" value="classpath:XXXXX/XXXXXXXX.csv" /> 

    <property name="lineMapper"> 
     <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> 
      <property name="lineTokenizer"> 
       <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"> 
        <property name="delimiter" value="|" /> 
        <property name="names" 
         value="id,numen,prenom,grade,anneeScolaire,academieOrigin,academieArrivee,codeUsi,specialiteEmploiType,natureSupport,dateEffet,modaliteAffectation" /> 
       </bean> 
      </property> 
      <property name="fieldSetMapper"> 
       <bean class="com.test.mapper.MouvementFieldSetMapper" /> 
      </property> 
     </bean> 
    </property> 

</bean> 

<!-- <bean id="itemReader" scope="step" autowire-candidate="false" parent="mouvementReaderParent">--> 
<!--  <property name="resource" value="#{stepExecutionContext[fileName]}" />--> 
<!-- </bean>--> 

<bean id="mouvementReader" class="com.test.reader.MouvementItemReader" scope="step"> 
    <property name="delegate" ref="mouvementReaderParent" /> 
    <property name="parameterValues"> 
     <map> 
      <entry key="fromId" value="#{stepExecutionContext[fromId]}" /> 
      <entry key="toId" value="#{stepExecutionContext[toId]}" /> 
     </map> 
    </property> 
</bean> 

<!-- <bean id="xmlItemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">--> 
<!--  <property name="resource" value="file:xml/outputs/Mouvements.xml" />--> 
<!--  <property name="marshaller" ref="reportMarshaller" />--> 
<!--  <property name="rootTagName" value="Mouvement" />--> 
<!-- </bean>--> 

<bean id="reportMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller"> 
    <property name="classesToBeBound"> 
     <list> 
      <value>com.test.model.Mouvement</value> 
     </list> 
    </property> 
</bean> 

TODO :改變我的閱讀器在其他與位置(開始和結束位置)閱讀與Java中的Scanner類一樣。

希望得到這個幫助。

+0

有趣的解決方案的步驟。我已編輯修復格式並添加缺少的方法簽名,但我可能弄錯了。你可以請審查並添加正確的方法名稱和接線選項以獲得'gridSize'變量初始化? –

0

您可以將您的輸入文件拆分爲多個文件,使用Partitionner並使用線程加載小文件,但出錯時,必須在清理完數據庫後重新啓動所有作業。

<batch:job id="transformJob"> 
<batch:step id="deleteDir" next="cleanDB"> 
    <batch:tasklet ref="fileDeletingTasklet" /> 
</batch:step> 
<batch:step id="cleanDB" next="split"> 
    <batch:tasklet ref="countThreadTasklet" /> 
</batch:step> 
<batch:step id="split" next="partitionerMasterImporter"> 
    <batch:tasklet> 
     <batch:chunk reader="largeCSVReader" writer="smallCSVWriter" commit-interval="#{jobExecutionContext['chunk.count']}" /> 
    </batch:tasklet> 
</batch:step> 
<batch:step id="partitionerMasterImporter" next="partitionerMasterExporter"> 
    <partition step="importChunked" partitioner="filePartitioner"> 
     <handler grid-size="10" task-executor="taskExecutor" /> 
    </partition> 
</batch:step> 

完全example code working (on Github)

希望這有助於。