2017-02-03 27 views
0

我有一個簡單的bach作業,它從JMS隊列(ActiveMQ)讀取並寫入文件。批處理作業按預期方式運行,並寫入文件以遵守已設置爲10,000的提交間隔。Spring批處理:從JMS隊列中讀取,步驟並未結束

有這方面的

  1. 批處理作業隊列中讀取並沒有結束2個觀察值。

  2. 我看到,隊列中的所有消息都已被使用,但只有當新消息被推送到JMS隊列並且滿足提交間隔時,纔會將最後一個塊寫入文件。

它是預期的行爲嗎?我想安排批處理作業,並在該時間點消耗並寫入隊列中的所有消息。有任何建議嗎?

@Autowired 
private JobBuilderFactory jobBuilderFactory; 

@Bean 
public TransactionAwareConnectionFactoryProxy activeMQConnectionFactory() { 
    ActiveMQConnectionFactory amqConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); 
    TransactionAwareConnectionFactoryProxy activeMQConnectionFactory = new TransactionAwareConnectionFactoryProxy(amqConnectionFactory); 
    return activeMQConnectionFactory; 
} 

@Bean 
public ActiveMQQueue defaultQueue() { 
    return new ActiveMQQueue("firstQueue"); 
} 

@Bean 
public PlatformTransactionManager transactionManager() { 
    return new ResourcelessTransactionManager(); 
} 

@Bean 
public JobRepository jobRepository(PlatformTransactionManager transactionManager) throws Exception { 
    return new MapJobRepositoryFactoryBean(transactionManager).getObject(); 
} 

@Bean 
@DependsOn("jobRepository") 
public SimpleJobLauncher simpleJobLauncher(JobRepository jobRepository) { 
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher(); 
    simpleJobLauncher.setJobRepository(jobRepository); 
    return simpleJobLauncher; 
} 

如果我設置receiveTimeout爲較小的數字,所有消息都沒有被消耗,從而設定爲上限。

@Bean 
@DependsOn(value = { "activeMQConnectionFactory", "defaultQueue" }) 
public JmsTemplate firstQueueTemplate(ActiveMQQueue defaultQueue, TransactionAwareConnectionFactoryProxy activeMQConnectionFactory) { 
    JmsTemplate firstQueueTemplate = new JmsTemplate(activeMQConnectionFactory); 
    firstQueueTemplate.setDefaultDestination(defaultQueue); 
    firstQueueTemplate.setSessionTransacted(true); 
    firstQueueTemplate.setReceiveTimeout(Long.MAX_VALUE); 
    return firstQueueTemplate; 
} 

配置批處理作業。

@Bean 
public JmsItemReader<String> jmsItemReader(JmsTemplate firstQueueTemplate) { 
    JmsItemReader<String> jmsItemReader = new JmsItemReader<>(); 
    jmsItemReader.setJmsTemplate(firstQueueTemplate); 
    jmsItemReader.setItemType(String.class); 
    return jmsItemReader; 
} 


@Bean 
public ItemWriter<String> flatFileItemWriter() { 
    FlatFileItemWriter<String> writer = new FlatFileItemWriter<>(); 
    writer.setResource(new FileSystemResource("/mypath/output.csv")); 
    writer.setLineAggregator(new PassThroughLineAggregator<String>()); 
    return writer; 
} 

@Bean 
@DependsOn(value = { "jmsItemReader", "jmsItemWriter", "jobRepository", "transactionManager" }) 
public Step queueReaderStep(JmsItemReader<String> jmsItemReader, ItemWriter<String> flatFileItemWriter, JobRepository jobRepository, 
     PlatformTransactionManager transactionManager) throws Exception { 
    StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager); 
    AbstractTaskletStepBuilder<SimpleStepBuilder<String, String>> step = stepBuilderFactory.get("queueReaderStep").<String, String> chunk(10000) 
      .reader(jmsItemReader).writer(flatFileItemWriter); 
    return step.build(); 
} 


@Bean 
@DependsOn(value = { "jobRepository", "queueReaderStep" }) 
public Job jsmReaderJob(JobRepository jobRepository, Step queueReaderStep) { 
    return this.jobBuilderFactory.get("jsmReaderJob").repository(jobRepository).incrementer(new RunIdIncrementer()) 
      .flow(queueReaderStep).end().build(); 
} 

回答

2

由Spring Batch的提供的JmsItemReader真的意味着更多,因爲模板或例子,如你注意,它永遠不會返回null所以過程永無止境。你需要寫一些東西來表明給定的消息表明該步驟已完成。

相關問題