2015-04-06 35 views
0

我正在使用spring批處理模塊處理目錄中的csv文件。目錄可能包含具有特定擴展名的多個文件,我正在使用MultiResourceItemReader來讀取這些文件。 作業將接收3個作業參數,分別爲read_from_directory,move_to_directory和default_user_id。所有這些參數對於所有作業運行將保持相同。 read_from_directory將包含 多個csv文件,作業應該一個接一個地處理這些文件。我面臨的問題是因爲作業參數是相同的,我得到JobInstanceAlreadyCompleteException當第二次運行作業。我知道這個問題可以通過使用額外的時間戳參數來使作業參數唯一來解決。但是由於添加時間戳參數會使每個作業實例都是唯一的,我不希望使用這種方法,因爲它會在使作業可重新啓動時產生問題。 所以我想一些建議,spring批處理 - multiResourceItemReader:如何使作業參數唯一併重新啓動作業

  1. 我怎樣才能使各項工作實例是唯一不使用時間戳參數。

  2. 在這種情況下如何使作業可重新啓動?加入'restartable =「true」'就足夠了,還是需要一些額外的配置/編碼。我有點困惑,因爲作業會從目錄中讀取多個文件。所以,如果一項工作失敗,例如,由於在一個文件中記錄不正確,我怎麼能從它離開的地方重新開始相同的工作?我已經將作業配置爲在一段時間之後使用調度程序定期運行。因此,如果工作失敗,然後我糾正csv文件中的錯誤,工作將從下次運行時停止的地方開始?

請從我的配置下面相關部分找到:

<batch:job id="testJob" restartable="true"> 
     <batch:step id="step1"> 
      <batch:tasklet> 
       <batch:chunk reader="multiResourceItemReader" writer="fileWriter" 
        commit-interval="1"> 
       </batch:chunk> 
      </batch:tasklet> 
     </batch:step> 
    </batch:job> 

    <bean id="fileWriter" class="com.ivish.TestFileWriter" /> 
    <bean id="multiResourceItemReader" class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step"> 
     <property name="resources" value="file:#{jobParameters['read_from_directory']}/*.csv" /> 
     <property name="delegate" ref="fileReader" /> 
    </bean> 

    <bean id="fileReader" class="com.ivish.TestFileReader" scope="step"> 
     <property name="delegate" ref="delegateFileReader" /> 
     <property name="moveToDirectory" value="#{jobParameters['move_to_directory']}" /> 
    </bean> 
    <bean id="delegateFileReader" class="org.springframework.batch.item.file.FlatFileItemReader"> 
     <property name="lineMapper"> 
      <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> 
       <property name="lineTokenizer" ref="fileTokenizer" /> 
       <property name="fieldSetMapper"> 
        <bean 
         class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper" /> 
       </property> 
      </bean> 
     </property> 
    </bean>   

謝謝。

回答

0

問題1:你可以用你自己的邏輯實現JobParameterIncrementer下一個實例是什麼,以及你想如何增加參數。然後,根據您的邏輯,您可以運行下一個作業實例,以便運行下一個實例,否則,只需運行即可重新啓動最新的實例。如果您開始使用CommandLineJobRunner,則可以通過-next運行下一個實例,如果以編程方式執行此操作,則可以使用JobOperator#startNextInstance(String jobName)JobParameterIncrementer這裏是example

問題2:對於重新啓動加入restartable="true"應該做的伎倆。 FlatFileItemReader這是代理讀取文件擴展AbstractItemCountingItemStreamItemReader它保存讀取文件時的狀態。至於MultiResourceItemReader你可以documentation看到,它說:

輸入資源使用setComparator(比較),以確保資源的排序工作之間保持有序運行在重啓方案。

所以這意味着資源列表是有序的,並且每個都被委託給FlatFileItemReader,它保留了運行之間的順序和計數。

1

Spring Batch有兩個截然不同的概念,涉及作業「運行」,JobInstanceJobExecution

JobInstance是邏輯運行的概念。它由一套獨特的識別工作參數來識別。在你的例子中,對於read_from_directory,move_to_directory和default_user_id的每個組合,我希望有一個JobInstance

其他的概念是JobExecution。這代表了一次物理運行。例如,如果運行read_from_directory,move_to_directory和default_user_id的組合,並且它通過,則JobInstance將有一個孩子JobExecution。但是,如果第一次嘗試(第一次JobExecution)失敗,則可以重新啓動該作業。重新啓動將在現有的JobInstance(一次邏輯運行下的兩次物理運行)下創建新的JobExecution

考慮到上述情況,每個JobInstance將通過read_from_directory,move_to_directory,default_user_id的組合獨特的,某種(Spring Batch的的運行ID提供基於一個計數器開箱即用,也可以使用時間戳)。

您可以將文檔在這裏閱讀更多關於的JobInstanceJobExecution的概念:http://docs.spring.io/spring-batch/trunk/reference/html/domain.html#domainJob

相關問題