1

我正在使用Spring Integration 4.1.5並嘗試使用事務做些事情,但不幸的是我無法找到工作示例,也找不到工作示例。我正在嘗試設置一個正在查找消息的JMS輪詢器。一旦收到消息,服務激活器就會將一行插入到數據庫中,並將消息傳遞給另一個服務激活器。我想先做兩件,消息提取&數據庫插入事務。我不希望剩下的流程成爲事務性的。我使用Weblogic作爲應用程序容器,因此將使用WebLogicJtaTransactionManager。JMS Poller Transactional

我遇到的問題是我無法做出前兩件交易。它可以是全部,也可以是無。我嘗試了很多方法,但我覺得在輪詢上使用建議鏈是最好的選擇。我將能夠控制哪些方法將成爲交易的一部分。

我見過使用消息驅動監聽器的示例,但我使用的是Weblogic,並且將使用工作管理器,我相信我必須使用輪詢器才能利用工作管理器(如果情況並非如此,我想這是未來的另一個問題!)

我已經採取了XML並簡化了它,但除了編輯出包名,上下文產生了問題。

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:file="http://www.springframework.org/schema/integration/file" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp" 
    xmlns:int-xml="http://www.springframework.org/schema/integration/xml" 
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms" 
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/file 
    http://www.springframework.org/schema/integration/file/spring-integration-file.xsd 
    http://www.springframework.org/schema/integration/sftp 
    http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context.xsd 
    http://www.springframework.org/schema/integration/xml 
    http://www.springframework.org/schema/integration/xml/spring-integration-xml.xsd 
    http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 
    http://www.springframework.org/schema/tx 
    http://www.springframework.org/schema/tx/spring-tx.xsd 
    http://www.springframework.org/schema/aop 
    http://www.springframework.org/schema/aop/spring-aop.xsd"> 


    <bean id="jtaTransactionManager" 
     class="org.springframework.transaction.jta.WebLogicJtaTransactionManager"> 
     <property name="transactionManagerName" value="javax.transaction.TransactionManager" /> 
    </bean> 

    <bean id="insertMessageToDb" class="com.ReadMsgFromAxway" /> 
    <bean id="serviceActivator" class="com.CreateTMDFile" /> 

    <int-jms:inbound-channel-adapter id="jmsDefaultReceiver" 
     connection-factory="inboundDefaultAdaptorConnectionFactory" 
     extract-payload="false" destination="inboundAdaptorDefaultListenerQueue" 
     channel="inboundJMS" acknowledge="transacted"> 
     <int:poller id="poller" 
      max-messages-per-poll="100" fixed-rate="10"> 
      <int:advice-chain> 
       <ref bean="txAdvice" /> 
      </int:advice-chain> 
     </int:poller> 
    </int-jms:inbound-channel-adapter> 


    <tx:advice id="txAdvice" transaction-manager="jtaTransactionManager"> 
     <tx:attributes> 
      <tx:method name="processMessage" propagation="REQUIRED"/> 
     </tx:attributes> 
    </tx:advice> 

    <aop:config> 
     <aop:pointcut id="txOperation" 
      expression="execution(* axway.ReadMsgFromAxway.processMessage(..))"/> 
     <aop:advisor advice-ref="txAdvice" pointcut-ref="txOperation" /> 
    </aop:config> 

    <int:service-activator input-channel="inboundJMS" 
     output-channel="serviceActivatorChannel" ref="insertMessageToDb" method="processMessage" /> 

    <int:chain input-channel="serviceActivatorChannel" output-channel="nullChannel"> 
     <int:service-activator ref="serviceActivator" /> 
    </int:chain> 

</beans> 

ReadMsgFromAxway.java

public Message<File> processMessage(Message<?> message) { 
//Insert into DB 
     trackerProcess.insertUpdateMessageTracker(message, "axwayChannel", 
       "axwayChannel", currentStatusID, null, null); 
     count++; 
     int mod = count % 2; 
     if (mod != 0) { 
      // pass every 2 
      String hello = "hey"; 
     } else { 
      throw new RuntimeException("Testing transactional"); 
     } 

     Message<File> springMessage = MessageBuilder.createMessage(payloadFile, 
       messageHeaders); 
     return springMessage; 
    } 

的XML原樣不進行任何操作,運行時是否引發異常,或有異常在下次定期服務活化劑成分拋出。

如果我改變的建議屬性

 <tx:method name="*" propagation="REQUIRED"/> 

然後,在第一和第二服務激活的異常導致回滾。

奇怪的是如果我這樣做

 <tx:method name="processMessage" propagation="REQUIRED"/> 
     <tx:method name="*" propagation="NEVER"/> 

然後在第一個服務激活,或第二激活運行時是否引發異常,該消息被回滾。我認爲切入點會限制哪一類會導致交易,但我可能會誤解某些東西。

另一個注意事項 - 這個應用程序被安置在一個耳朵文件與其他幾場戰爭。此上下文啓動整個入站過程,並通過JMS隊列連接到另一個包含業務邏輯的war。在使用方法name =「*」的場景中,我看到了業務邏輯戰爭中的異常,導致原始入站消息的JMS消息也回滾。我的印象是,第二場戰爭將在另一個線程中進行處理,因爲它通過隊列接收消息,因此不會成爲交易的一部分。這可能是容器管理的JTA的副作用嗎?

謝謝!

回答

0

我建議您閱讀關於交易的Dave Syer的article,您可以在「更多像這樣」中找到鏈接。

現在看起來您完全不瞭解交易和AOP。您應該更加關注Spring Framework中的AOP支持。

一般而言,聲明式交易(方法上的@Transactional)是AOP通知的特定情況。任何AOP背後的主要概念都是調用堆棧的邊界,通過方法調用產生,我們爲其指定一個建議。

但是,如果在目標對象中沒有這樣的方法,它將不會被通知幷包裝到AOP代理。像processMessage的情況一樣,當您將txAdvice應用於org.springframework.messaging.Message.MessageSource周圍的某個內部對象時,JmsDestinationPollingSource的合同就是<int-jms:inbound-channel-adapter>

相反,您可以使用<transactional>配置<poller>。正確的:所有的流量都會被交易覆蓋。

要在這種情況下完成交易,該內部

Callable<Boolean> pollingTask = new Callable<Boolean>() { 

     @Override 
     public Boolean call() throws Exception { 
      return doPoll(); 
     } 
    }; 

周圍和handleMessage(),你應該只完成方法調用作爲常規@Transactional情況。爲此,我們應該把下一個消息處理到不同的線程。僅僅因爲默認情況下所有的<channel> s都是DirectChannel並與當前調用堆棧綁定。

爲此,您可以使用ExecutorChannel<int:dispatcher task-executor="threadPoolExecutor"/>)作爲您的serviceActivatorChannel

從那裏你不需要其餘的AOP配置。

不確定您的第二個問題是否適用於其他網絡應用程序。看起來它有一些邏輯來回應它的工作,以防從你身邊出現一些不一致的情況。但無論如何,這看起來像一個不同的問題。

+0

感謝@ Artem-bilan的迴應。我回頭看看使用並讓它工作。查看一些文檔,我需要在Weblogic的數據源中啓用2階段提交。 –

+0

我提交了我的評論過早,並沒有足夠的編輯速度......看起來像是從測試和你的建議,輪詢器中的 doesn不創造,也不傳播交易?我已經放置了,並收到一個沒有事務的錯誤。謝謝。 –

+0

??? 'MANDATORY'不會創建新的交易。在'TransactionDefinition'中看到它的JavaDoc。在大多數情況下,你應該使用默認的'REQUIRED'。從另一方面來說,你不需要'txAdvice'來代替''。有'' –