2015-10-30 86 views
0

我們有我們從文件中讀取的要求運行春季批處理作業,porocess,並寫入到一個平面文件。我的問題是,FlatFileItemReader會跟蹤它處理的記錄,以便如果作業在中間失敗,它可以拾取失敗的地方。重新啓動由多個線程

例如假設油門限制是2和提交間隔是10,我的文件有20條記錄。假設thread1正在處理前10條記錄,並且 thread2正在處理下10條記錄。如果線程2的所有10條記錄都成功處理,並且由於一條錯誤記錄導致線程1失敗,並且整個作業都失敗。下一次 當作業重新啓動時,彈簧如何識別未處理的記錄?

什麼是更好的方式來處理使用多線程的文件,並在同一時間能夠在中間無法啓動了。

<batch:job job-repository="jobRepository" id="insertIntoCsvFromCsvJob"> 
     <batch:step id="step1"> 
      <batch:tasklet transaction-manager="transactionManager" 
       task-executor="taskExecutor" throttle-limit="${throttle-limit}"> 
       <batch:chunk reader="csvFileItemReader" writer="customWriter" processor="compositeProcessor 
        commit-interval="${commit-interval}" > 
       </batch:chunk> 
      </batch:tasklet> 
     </batch:step> 
    </batch:job> 

    <bean id="csvFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> 
     <property name="resource" value="classpath:files/input.csv" />   
     <property name="lineMapper" ref="fieldSetMapper" /> 
    </bean> 

    <bean id="csvFileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"> 
     <property name="resource" value="file:c:/outout.csv" /> 
     <property name="shouldDeleteIfExists" value="true" /> 
     <property name="lineAggregator" ref="lineAggregator" /> 
    </bean> 

    <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" /> 

回答

0

不,它不會。

我甚至會說,你的代碼將打破遲早的事。問題在於FlatFileItemReader(分別爲FlatFileItemWriter)的讀取(分別爲寫入方法)不是線程安全的。

如果你想異步使用它們,你需要實現一個包裝器ItemWriter和ItemReader,它將調用同步到FlatFileItemReader/Writer。

但是,當然,整個中間的重新啓動將不起作用,因爲如果僅使用FlatFileItemReader/Writer的標準實現,塊的順序不能保證。問題在於塊可能超過另一個塊,導致執行上下文中的讀取位置指針在被超越的塊之後移動。但是,如果被超越的塊失敗,那麼執行上下文中的位置將指示失敗的塊的條目已成功處理。

當然,你也可以實現一個適配器,在那裏你跟蹤處理的條目中,只有移動位置指針向前,當你知道你的邏輯,是此前的所有條目進行了處理,並已寫入。

+0

謝謝。我也這麼認爲......只是想確認一下。 –