2015-04-28 262 views
2

是否可以在單個應用程序中使用這兩個啓動器?spring-boot-starter-jta-atomikos和spring-boot-starter-batch

我想將CSV文件中的記錄加載到數據庫表中。 Spring Batch表存儲在不同的數據庫中,所以我假設我需要使用JTA來處理事務。

無論何時將@EnableBatchProcessing添加到我的@Configuration類中,它都會配置一個PlatformTransactionManager,這會阻止Atomikos自動配置它。

是否有任何彈簧引導+批+ jta樣本顯示如何做到這一點?

非常感謝, 詹姆斯

回答

1

我只是通過這個去了,我發現的東西,似乎工作。正如您所注意到的,@EnableBatchProcessing會導致創建DataSourceTransactionManager,這會混淆一切。我在@EnableBatchProcessing中使用了modular = true,所以ModularBatchConfiguration類被激活。

我所做的就是停止使用@EnableBatchProcessing,而是將整個ModularBatchConfiguration類複製到我的項目中。然後我註釋掉了transactionManager()方法,因爲Atomikos配置創建了JtaTransactionManager。我也不得不重寫jobRepository()方法,因爲它被硬編碼爲使用在DefaultBatchConfiguration內創建的DataSourceTransactionManager。我也不得不明確導入JtaAutoConfiguration類。這將電線正確連接(根據執行器的「豆」端點 - 感謝上帝)。但是當你運行它時,事務管理器會拋出一個異常,因爲某處某處設置了明確的事務隔離級別。所以我也寫了一個BeanPostProcesso r找到交易管理器並打電話給txnMgr.setAllowCustomIsolationLevels(true);

現在一切正常,但作業正在運行時,即使我能看到SQLYog中的數據,我也無法使用JdbcTemplate從batch_step_execution表中獲取當前數據。這必須與事務隔離有關,但我還沒有能夠理解它。

這是我的配置類,從Spring複製並修改如上所述。 PS,我有我的DataSource指向批註表註釋爲@Primary的數據庫。此外,我將我的DataSource豆類更改爲org.apache.tomcat.jdbc.pool.XADataSource;我不確定是否有必要。

@Configuration 
@Import(ScopeConfiguration.class) 
public class ModularJtaBatchConfiguration implements ImportAware 
{ 
    @Autowired(required = false) 
    private Collection<DataSource> dataSources; 

    private BatchConfigurer configurer; 

    @Autowired 
    private ApplicationContext context; 

    @Autowired(required = false) 
    private Collection<BatchConfigurer> configurers; 

    private AutomaticJobRegistrar registrar = new AutomaticJobRegistrar(); 

    @Bean 
    public JobRepository jobRepository(DataSource batchDataSource, JtaTransactionManager jtaTransactionManager) throws Exception 
    { 
     JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); 
     factory.setDataSource(batchDataSource); 
     factory.setTransactionManager(jtaTransactionManager); 
     factory.afterPropertiesSet(); 
     return factory.getObject(); 
    } 

    @Bean 
    public JobLauncher jobLauncher() throws Exception { 
     return getConfigurer(configurers).getJobLauncher(); 
    } 

// @Bean 
// public PlatformTransactionManager transactionManager() throws Exception { 
//  return getConfigurer(configurers).getTransactionManager(); 
// } 

    @Bean 
    public JobExplorer jobExplorer() throws Exception { 
     return getConfigurer(configurers).getJobExplorer(); 
    } 

    @Bean 
    public AutomaticJobRegistrar jobRegistrar() throws Exception { 
     registrar.setJobLoader(new DefaultJobLoader(jobRegistry())); 
     for (ApplicationContextFactory factory : context.getBeansOfType(ApplicationContextFactory.class).values()) { 
      registrar.addApplicationContextFactory(factory); 
     } 
     return registrar; 
    } 

    @Bean 
    public JobBuilderFactory jobBuilders(JobRepository jobRepository) throws Exception { 
     return new JobBuilderFactory(jobRepository); 
    } 

    @Bean 
    // hopefully this will autowire the Atomikos JTA txn manager 
    public StepBuilderFactory stepBuilders(JobRepository jobRepository, JtaTransactionManager ptm) throws Exception { 
     return new StepBuilderFactory(jobRepository, ptm); 
    } 

    @Bean 
    public JobRegistry jobRegistry() throws Exception { 
     return new MapJobRegistry(); 
    } 

    @Override 
    public void setImportMetadata(AnnotationMetadata importMetadata) { 
     AnnotationAttributes enabled = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(
       EnableBatchProcessing.class.getName(), false)); 
     Assert.notNull(enabled, 
       "@EnableBatchProcessing is not present on importing class " + importMetadata.getClassName()); 
    } 

    protected BatchConfigurer getConfigurer(Collection<BatchConfigurer> configurers) throws Exception { 
     if (this.configurer != null) { 
      return this.configurer; 
     } 
     if (configurers == null || configurers.isEmpty()) { 
      if (dataSources == null || dataSources.isEmpty()) { 
       throw new UnsupportedOperationException("You are screwed"); 
      } else if(dataSources != null && dataSources.size() == 1) { 
       DataSource dataSource = dataSources.iterator().next(); 
       DefaultBatchConfigurer configurer = new DefaultBatchConfigurer(dataSource); 
       configurer.initialize(); 
       this.configurer = configurer; 
       return configurer; 
      } else { 
       throw new IllegalStateException("To use the default BatchConfigurer the context must contain no more than" + 
                 "one DataSource, found " + dataSources.size()); 
      } 
     } 
     if (configurers.size() > 1) { 
      throw new IllegalStateException(
        "To use a custom BatchConfigurer the context must contain precisely one, found " 
          + configurers.size()); 
     } 
     this.configurer = configurers.iterator().next(); 
     return this.configurer; 
    } 

} 

@Configuration 
class ScopeConfiguration { 

    private StepScope stepScope = new StepScope(); 

    private JobScope jobScope = new JobScope(); 

    @Bean 
    public StepScope stepScope() { 
     stepScope.setAutoProxy(false); 
     return stepScope; 
    } 

    @Bean 
    public JobScope jobScope() { 
     jobScope.setAutoProxy(false); 
     return jobScope; 
    } 

} 
+0

最後,即使這並沒有爲我工作。如果沒有讓Atomikos JTA Txn Mgr瘋狂並鎖定並殺死我所有的工作,我無法查詢數據庫。然後我意識到我的第二個數據源只讀取一個作業,所以我將所有配置恢復爲標準的非JTA配置,完全取出Atomikos,並創建了第二個只讀數據源作爲Tomcat DataSource池bean,其中autoCommit =只有在特定工作啓動時纔是真實的。 –

0

我找到了一個解決方案,我能夠保持@EnableBatchProcessing但不得不實施BatchConfigurer和Atomikos公司豆類,看到我的完整的答案在這so answer