2015-04-08 74 views
0

我要實現以下用例在春季批處理作業:處理多個StoredProcedureItemReader調用

  1. 通過列表,經由StoredProcedureItemReader
  2. 迭代閱讀供應商的名單,並呼籲另一StoredProcedureItemReader對每個供應商(如輸入參數)。
  3. 第二個SP的輸出將被寫入CSV。

我想出了以下策略:

  1. 步驟開始
  2. SP ItemReader返回供應商的名單。
  3. 在ItemWriter,提供者保存到ExecutionContext
  4. 步驟1結束
  5. 步驟2開始
  6. 另一SP ItemReader從ExecutionContext
  7. 訪問提供商另一個ItemWriter寫入使用FlatFileItemWriter
  8. 到CSV的響應

我無法理解第二個SP ItemReader將如何訪問由第一個SP ItemReader返回的提供者列表。分區會在這裏幫助嗎?另外,有沒有更好的策略來完成這個?

EDIT 1:

下面是原來實行ItemProcessor中的(如富集):

@Scope("step") 
public class FetchReportFromProviderProcessor implements 
     ItemProcessor<RiscProvider, List<SogReportRecord>>, ItemStream { 

    StoredProcedureItemReader<SogReportRecord> reader = new StoredProcedureItemReader<SogReportRecord>(); 
    private DataSource dataSource; 

    @Value("#{jobParameters['date']}") 
    private String date; 

    @Override 
    public List<SogReportRecord> process(final RiscProvider item) throws Exception { 

     SogReportRecord record = null; 
     List<SogReportRecord> records = new ArrayList<SogReportRecord>(); 

     SqlParameter[] sqlParameters = new SqlParameter[] {new SqlParameter(OracleTypes.CURSOR)};   

     reader.setParameters(sqlParameters); 
     reader.setPreparedStatementSetter(new PreparedStatementSetter() { 

      @Override 
      public void setValues(PreparedStatement ps) throws SQLException { 
       ps.setString(0, item.getPrefix()); 
       ps.setString(1, date); 
      } 
     }); 

     while((record = reader.read()) != null) { 
      records.add(record); 
     } 

     return records; 
    } 

    public DataSource getDataSource() { 
     return dataSource; 
    } 

    public void setDataSource(DataSource dataSource) { 
     this.dataSource = dataSource; 
    } 

    @Override 
    public void open(ExecutionContext executionContext) 
      throws ItemStreamException { 
     reader.setDataSource(dataSource); 
     reader.setProcedureName("RISC_GET_DAYMOVEINOUT"); 
     reader.open(executionContext);  
    } 

    @Override 
    public void update(ExecutionContext executionContext) 
      throws ItemStreamException { 
     reader.update(executionContext);   
    } 

    @Override 
    public void close() throws ItemStreamException { 
     reader.close();  
    } 

} 

和XML部分:

<batch:job id="SOG_MOVEINOUT_REPORT_GENERATOR"> 
    <batch:step id="GET_REPORTS"> 
     <batch:tasklet> 
      <batch:chunk reader="getProviders" 
       processor="fetchRecordsFromProvider" 
       writer="sogReportWriter" commit-interval="500" /> 
     </batch:tasklet> 
    </batch:step> 
</batch:job> 

<!-- Reader to fetch list of providers --> 
<bean id="getProviders" class="org.springframework.batch.item.database.StoredProcedureItemReader"> 
    <property name="dataSource" ref="dataSource" /> 
    <property name="procedureName" value="RISC_GET_PROVIDER" /> 
    <property name="parameters"> 
     <list> 
      <bean class="org.springframework.jdbc.core.SqlOutParameter"> 
       <constructor-arg index="0" value="providers" /> 
       <constructor-arg index="1"> 
        <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR" /> 
       </constructor-arg> 
      </bean> 
     </list> 
    </property> 
    <property name="refCursorPosition" value="1" /> 
    <property name="rowMapper"> 
     <bean class="com.kpn.risc.ProviderRowMapper" /> 
    </property> 
</bean> 

<bean id="fetchRecordsFromProvider" class="com.kpn.risc.FetchReportFromProviderProcessor"> 
    <property name="dataSource" ref="dataSource" /> 
</bean> 

<bean id="sogReportWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step"> 
    <property name="resource" value="file:///${batch.job.report.dir}/report-#{stepExecutionContext['provider']}.csv" /> 
    <property name="lineAggregator"> 
     <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator"> 
      <property name="fieldExtractor"> 
       <bean class="org.springframework.batch.item.file.transform.PassThroughFieldExtractor" /> 
      </property> 
     </bean> 
    </property> 
</bean> 

在上面的代碼中,處理器是將其工作委託給SP ItemReader。但是在調用read()方法之前,讀者無法正確初始化。 ItemProcessor內部是否可以或建議撥打ItemReader

回答

0

您所描述的內容直接落入批處理的駕駛查詢模式。從本質上講,查詢定義了另一個查詢用來驅動查詢的id(或在您的案例提供者中)。通常,ItemReader會讀取ID並將每個ID傳遞給ItemProcessor以進行富集(「其他」查詢)。然後將結果傳遞給ItemWriter

您可以在Spring Batch的文檔,這裏的公共批處理模式部分閱讀更多關於驅動查詢模式:http://docs.spring.io/spring-batch/trunk/reference/html/patterns.html

+0

感謝您的答覆邁克爾。我的第一個想法是把'ItemProcessor'當成豐富。我試着將處理器委託給一個'StoredProcedureItemReader'。 (有問題的代碼)。但是我的閱讀器需要傳遞給處理器的提供者。在調用'start()'方法之前,我如何將它傳遞給讀者? – xsreality

+0

我假定你的意思是在你委託的ItemReader上打開(ExecutionContext上下文)方法。由於您的閱讀器沒有作爲閱讀器在Spring Batch中註冊,所以我們不會自動爲您調用「ItemStream」生命週期方法。要註冊您的委託閱讀器(這將導致這些方法被調用),您需要手動將閱讀器註冊爲一個流。如何做到這一點的文檔可以在這裏的5.1.9節中找到:http://docs.spring.io/spring-batch/reference/html/configureStep.html –

+0

好吧,我已經配置委託閱讀器作爲一個流。但是委託閱讀器需要提供者作爲存儲過程的輸入參數來初始化自己。我在處理器的'process'方法中有提供者。我如何將它傳遞給委託閱讀器? – xsreality