2016-06-10 94 views
2

我已經使用kafka binder做了一些測試,看起來spring-cloud-stream生產者不參與spring管理的事務。spring-cloud-stream生產者事務性

鑑於這樣的代碼

@RequestMapping(method = RequestMethod.POST) 
    @Transactional 
    public Customer insertCustomer(@RequestBody Customer customer) { 
     customerDao.insertCustomer(customer); 
     source.output().send(MessageBuilder.withPayload(CustomerEventHelper.createSaveEvent(customer)).build()); 
     if (true) { 
      throw new RuntimeException("rollback test"); 
     } 
     return customer; 
    } 

的customerDao.insertCustomer呼叫被回滾,但仍然發出的卡夫卡消息。如果我的客戶事件中有消費者將客戶插入數據倉庫,則數據倉庫和記錄系統在轉換回滾時將不同步。有沒有辦法讓kafka活頁夾在這裏進行交易?

回答

2

卡夫卡活頁夾不是事務性的,卡夫卡不支持一般的交易。

我們打算解決Spring Cloud Stream 1.1的交易管理問題:https://github.com/spring-cloud/spring-cloud-stream/issues/536

但是,你甚至可以目前發送消息後,才成功提交通過註冊這樣的事務同步:

TransactionSynchronizationManager.registerSynchronization(
    new TransactionSynchronization(){ 
     void afterCommit(){      
      source.output().send(MessageBuilder.withPayload(event).build()); 
    if (true) { 

     } 
}); 

http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/transaction/support/TransactionSynchronization.html

+0

快速注意:因爲這並沒有使交易分散,數據庫更新可能成功,但發送失敗。因此,在這種情況下更常見的是通過單獨的服務處理它們來將消息從更新中分離出來。例如:服務A只發送消息,服務B接收消息並更新數據庫。只有在事務成功後才能確認消息,而不是確保消息發送和數據庫更新是原子操作。 –

+0

我認爲這對我複製數據庫更改的用例很好。如果記錄系統中的數據庫操作成功,我只想要消息。如果消息發送失敗,我不太關心回滾記錄操作系統。 – gadams00

+0

其實,沒關係,我想我明白你的觀點。我正在考慮在TransactionSynchronizationManager中使用這種方法,當我將消息生成添加到已使用@Transactional的代碼中時。 – gadams00