1
我嘗試將消息寫入JMS隊列,我在下一步中將其寫入數據庫。第一部分應該是第二個異步同步。 JMS部分非常慢(1分鐘內1100個項目到隊列中)。春天批量寫入ActiveMQ
這就是我的工作。
@Bean
public Job multiThreadedStepJob() {
Flow flow1 = new FlowBuilder<Flow>("subflow1").from(step()).end();
Flow flow2 = new FlowBuilder<Flow>("subflow2").from(step2()).end();
Flow splitFlow = new FlowBuilder<Flow>("splitflow")
.split(new SimpleAsyncTaskExecutor()).add(flow1, flow2) .build();
return jobBuilders.get("multiThreadedStepJob")
.start(splitFlow).end().build();
}
第一步:
@Bean
public Step step() {
return stepBuilders.get("step")
.<OrderDTO, OrderDTO>chunk(CHUNK_SIZE)
.reader(reader())
.writer(writer())
.build();
}
第二步:
@Bean
public Step step2() {
return stepBuilders.get("step2")
.<OrderDTO, OrderDTO>chunk(100)
.reader(reader2())
.writer(writer2())
.build();
}
我認爲我的錯誤是一步的作家和第二步的讀者裏面,因爲我可以運行另一位讀者和作家在一起,我沒有問題。
@Bean
public JmsItemWriter<OrderDTO> writer() {
JmsItemWriter<OrderDTO> itemWriter = new JmsItemWriter<>();
itemWriter.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
return itemWriter;
}
@Bean
public JmsItemReader<OrderDTO> reader2() {
JmsItemReader<OrderDTO> itemReader = new JmsItemReader<>();
itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
itemReader.setItemType(OrderDTO.class);
return itemReader;
}
它們使用相同的JmsTemplate的用於連接到隊列:
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setDefaultDestination(queue());
jmsTemplate.setReceiveTimeout(500);
return jmsTemplate;
}
@Bean
public Queue queue() {
return new ActiveMQQueue("orderList");
}
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
factory.setTrustAllPackages(true);
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(30);
factory.setPrefetchPolicy(prefetchPolicy);
PooledConnectionFactory pool = new PooledConnectionFactory(factory);
pool.setMaxConnections(10);
pool.setMaximumActiveSessionPerConnection(10);
pool.isCreateConnectionOnStartup();
return pool;
}
我使用的配置的其餘部分是從@EnableBatchProcessing配置。有誰知道爲什麼這會變得如此緩慢?