2017-05-29 101 views
1

我嘗試將消息寫入JMS隊列,我在下一步中將其寫入數據庫。第一部分應該是第二個異步同步。 JMS部分非常慢(1分鐘內1100個項目到隊列中)。春天批量寫入ActiveMQ

這就是我的工作。

@Bean 
public Job multiThreadedStepJob() { 
    Flow flow1 = new FlowBuilder<Flow>("subflow1").from(step()).end(); 
    Flow flow2 = new FlowBuilder<Flow>("subflow2").from(step2()).end(); 
    Flow splitFlow = new FlowBuilder<Flow>("splitflow") 
    .split(new SimpleAsyncTaskExecutor()).add(flow1, flow2) .build(); 

    return jobBuilders.get("multiThreadedStepJob") 
          .start(splitFlow).end().build(); 

} 

第一步:

@Bean 
public Step step() { 
    return stepBuilders.get("step") 
     .<OrderDTO, OrderDTO>chunk(CHUNK_SIZE) 
     .reader(reader()) 
     .writer(writer()) 
     .build(); 
} 

第二步:

@Bean 
public Step step2() { 
    return stepBuilders.get("step2") 
      .<OrderDTO, OrderDTO>chunk(100) 
      .reader(reader2()) 
      .writer(writer2()) 
      .build(); 
} 

我認爲我的錯誤是一步的作家和第二步的讀者裏面,因爲我可以運行另一位讀者和作家在一起,我沒有問題。

@Bean 
public JmsItemWriter<OrderDTO> writer() { 
    JmsItemWriter<OrderDTO> itemWriter = new JmsItemWriter<>(); 
    itemWriter.setJmsTemplate(infrastructureConfiguration.jmsTemplate()); 
    return itemWriter; 
} 

@Bean 
public JmsItemReader<OrderDTO> reader2() { 
    JmsItemReader<OrderDTO> itemReader = new JmsItemReader<>(); 
    itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate()); 
    itemReader.setItemType(OrderDTO.class); 
    return itemReader; 
} 

它們使用相同的JmsTemplate的用於連接到隊列:

@Bean 
public JmsTemplate jmsTemplate() { 
    JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory()); 
    jmsTemplate.setDefaultDestination(queue()); 
    jmsTemplate.setReceiveTimeout(500); 
    return jmsTemplate; 
} 

@Bean 
public Queue queue() { 
    return new ActiveMQQueue("orderList"); 
} 

@Bean 
public ConnectionFactory connectionFactory() { 
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); 
    factory.setTrustAllPackages(true); 

    ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 
    prefetchPolicy.setQueuePrefetch(30); 

    factory.setPrefetchPolicy(prefetchPolicy); 

    PooledConnectionFactory pool = new PooledConnectionFactory(factory); 
    pool.setMaxConnections(10); 
    pool.setMaximumActiveSessionPerConnection(10); 
    pool.isCreateConnectionOnStartup(); 

    return pool; 
} 

我使用的配置的其餘部分是從@EnableBatchProcessing配置。有誰知道爲什麼這會變得如此緩慢?

回答

1

顯然jmsTemplate.setSessionTransacted(true);真的很重要。這加速了JMS隊列的寫作和閱讀。由於某種原因,我認爲默認值是真實的,因爲我正在處理批次。

反正其他人有這個問題先檢查一下,因爲這很容易忘記。