我正在使用自定義Iteamreader bean從MongoDB讀取數據。我的閱讀器按照reader中定義的pageSize(50)返回數據。但是處理器只能從50開始獲得31行數據。我嘗試了各種塊大小,但是一些處理器只獲得了前31行。spring批處理:itemprocessor未獲取讀取器讀取的所有數據
請幫我找到了這個錯誤......我想聽衆,但沒能找到問題..
---- XML配置----
<?xml version="1.0" encoding="UTF-8"?>
<context:property-placeholder location="classpath:application.properties" />
<context:component-scan base-package="com.xxx.yyy.batch.kernel" />
<context:component-scan base-package="com.xxx.yyy.batch.dao" />
<context:annotation-config />
<!-- Enable Annotation based Declarative Transaction Management -->
<tx:annotation-driven proxy-target-class="true"
transaction-manager="transactionManager" />
<!-- Creating TransactionManager Bean, since JDBC we are creating of type
DataSourceTransactionManager -->
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<batch:job id="txnLogJob" job-repository="jobRepository"
restartable="true">
<batch:step id="txnload">
<tasklet allow-start-if-complete="true">
<chunk reader="txnLogItemReader" writer="txnLogItemWriter"
processor="txnLogProcessor" commit-interval="20" />
</tasklet>
</batch:step>
<batch:listeners>
<batch:listener ref="completionListener" />
</batch:listeners>
</batch:job>
<bean id="completionListener"
class="com.xxx.yyy.batch.listeners.JobCompletionNotificationListener" />
<bean id="jobParametersDAOImpl" class="com.xxx.yyy.batch.dao.JobParametersDAOImpl" />
<bean id="batchLoader" class="com.xxx.yyy.batch.kernel.BatchLoader" />
<bean id="batchjobParameter" class="com.xxx.yyy.batch.dao.Batch_Job_Parameters" />
<bean id="txnLogItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter"
scope="step">
<property name="shouldDeleteIfExists" value="true" />
<property name="resource" value="file:target/test-outputs/output.txt" />
<property name="lineAggregator">
<bean
class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" />
</property>
</bean>
<bean id="txnLogProcessor"
class="com.xxx.yyy.batch.processor.MessageContextItemProcessor" />
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="databaseType" value="MYSQL" />
<property name="dataSource" ref="dataSource" />
<property name="transactionManager" ref="transactionManager" />
</bean>
<bean id="dataSource" class="com.xxx.yyy.common.DataSource"
destroy-method="close">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}" />
<property name="password" value="${jdbc.password}" />
<property name="connectionProperties" value="${jdbc.connectionProperties}" />
<property name="initialSize" value="${jdbc.initialSize}" />
<property name="maxTotal" value="${jdbc.maxTotal}" />
<property name="maxIdle" value="${jdbc.maxIdle}" />
<property name="minIdle" value="${jdbc.minIdle}" />
<property name="maxWaitMillis" value="${jdbc.maxWaitMillis}" />
<property name="testOnBorrow" value="${jdbc.testOnBorrow}" />
<property name="testWhileIdle" value="${jdbc.testWhileIdle}" />
<property name="testOnReturn" value="${jdbc.testOnReturn}" />
<property name="validationQuery" value="${jdbc.validationQuery}" />
</bean>
</beans>
定製讀者豆:
@Bean
public MongoItemReader<MessageContext> txnLogItemReader() {
MongoItemReader<MessageContext> reader = new MongoItemReader<MessageContext>();
reader.setPageSize(50);
reader.setCollection("txnlog");
reader.setTemplate(mongoTemplate);
String query = null ;
query = "{ \"audit_info.created_on\": { $gt: { \"$date\" : ?0 }, $lte: { \"$date\" : ?1 } }, "
+ "$and: [ { \"processing_status\": { $in: [?2] } } ] }" ;
reader.setQuery(query);
//Timestamp to_date_timestamp = jobParametersDAOImpl.getCurrentTimeStamp() ;
Batch_Job_Parameters job_param = jobParametersDAOImpl.getBatchJobParameters() ;
String from_date = job_param.getFrom_date().toString() ;
String [] splitstr = from_date.split(" ") ;
from_date = splitstr[0]+"T"+splitstr[1]+"00Z" ;
String to_date = job_param.getTo_date().toString() ;
splitstr = to_date.split(" ") ;
to_date = splitstr[0]+"T"+splitstr[1]+"00Z" ;
List<Object> parameterValues = new ArrayList<Object>() ;
parameterValues.add(from_date) ;
parameterValues.add(to_date) ;
parameterValues.add(job_param.getTxnlog_status_list()) ;
reader.setParameterValues(parameterValues);
reader.setTargetType(com....MessageContext.class);
Map<String,Direction> sorts = new HashMap<String,Direction>() ;
sorts.put("audit_info.created_on", org.springframework.data.domain.Sort.Direction.ASC) ;
reader.setSort(sorts);
return reader;
}
MongoDB包含超過500行,正如我前面提到的,我可以看到所有正在讀取的50行。 –
MongoDB包含超過500行,我可以看到正在讀取的所有50行。我將偵聽程序(步,塊,讀)放入配置中。聽者的 輸出是: 「步驟之前」,「前塊」 「之前讀」 50行數據 「讀出之後」 19次: 「之前讀」 「之後讀出」 20行數據的「前塊」 10倍處理器和寫入器處理 「塊後」: 「之前讀」 「讀取後」,「塊之後」 「之前讀」 10行由處理器和作家 處理的數據的「步驟後」 –