2017-10-17 148 views
0

我有訂閱該主題的JMS入站端點。一旦有消息轉換器將有效負載拆分成記錄列表,然後使用批量提交將其插入數據庫中。如果在插入數據庫時​​出現任何錯誤,我想將整個有效負載回滾到JMS。如何使用交易來實現這一點?mule:批處理中的事務

<batch:job name="lockboxBatch" max-failed-records="-1"> 
     <batch:input> 
      <jms:inbound-endpoint topic="lockbox" connector-ref="Active_MQ1" doc:name="JMS"> 
      </jms:inbound-endpoint> 
      <custom-transformer class="transformers.PaymentsTransformer" doc:name="Java"/> 
      <logger level="INFO" doc:name="Logger"/> 
     </batch:input> 
     <batch:process-records> 
      <batch:step name="Batch_Step"> 
       <expression-component doc:name="Expression"><![CDATA[payload[2].batchAmount='hghghfghfhgf']]></expression-component> 
       <batch:commit size="4" doc:name="Batch Commit"> 
        <db:insert config-ref="Oracle_Configuration" doc:name="Database" bulkMode="true" > 
         <db:parameterized-query><![CDATA[INSERT INTO TIB_INT_AR_PAYMENT_IFACE (TRANSMISSION_REQUEST_ID,DESTINATION_ACCOUNT,ORIGINATION,TRANSMISSION_RECORD_COUNT,TRANSMISSION_AMOUNT,LOCKBOX_NUMBER,LOCKBOX_BATCH_COUNT,LOCKBOX_RECORD_COUNT,LOCKBOX_AMOUNT,BATCH_NAME,BATCH_AMOUNT,BATCH_RECORD_COUNT,ITEM_NUMBER,CURRENCY_CODE,REMITTANCE_AMOUNT,TRANSIT_ROUTING_NUMBER,ACCOUNT,CHECK_NUMBER,CUSTOMER_NUMBER,OVERFLOW_INDICATOR,OVERFLOW_SEQUENCE,INVOICE1,AMOUNT_APPLIED1) VALUES (#[payload.?transmissiosnRequestID],#[payload.?destinastionAccount],#[payload.?origination],#[payload.?transmissionSrecordCount],#[payload.?transmisssionAmount],#[payload.lockboxNumber],#[payload.lockboxBatchCount],#[payload.lockboxRecordCount],#[payload.lockboxAmount],#[payload.batchName],#[payload.batchAmount],#[payload.batchRecordCount],#[payload.itemNumber],#[payload.currencyCode],#[payload.remittanceAmount],#[payload.transitRoutingNumber],#[payload.account],#[payload.checkNumber],#[payload.customerNumber],#[payload.overflowIndicator],#[payload.overflowSequence],#[payload.invoice1],#[payload.amountApplied1])]]></db:parameterized-query> 
        </db:insert> 
       </batch:commit> 
      </batch:step> 
      <batch:step name="Batch_Step1" accept-policy="ONLY_FAILURES"> 
      <set-payload value="#[getStepExceptions()]" doc:name="Set Payload"/> 
       <foreach collection="#[payload.values()]" doc:name="For Each"> 
        <jms:outbound-endpoint queue="Invalid_Transmission" connector-ref="Active_MQ" doc:name="JMS"/> 
       </foreach> 
      </batch:step> 


     </batch:process-records> 
     <batch:on-complete> 
      <logger message="Completed the insert" level="INFO" doc:name="Logger"/> 
     </batch:on-complete> 
    </batch:job> 

回答

0

你可以嘗試兩件事。

1.Make max-failed-records =「0」,這將在批處理步驟失敗的情況下回滾。

2.將DB連接器映射到事務範圍內,並根據需要處理異常情況。

<transactional action="ALWAYS_BEGIN" doc:name="Transactional"> 
    <db:insert...>........</db:insert> 
</transactional> 

請考慮下面更新的代碼,您可以根據您的要求進行更改。

<batch:job name="lockboxBatch" max-failed-records="0"> 
    <batch:input> 
     <jms:inbound-endpoint topic="lockbox" connector-ref="Active_MQ1" doc:name="JMS"> 
     </jms:inbound-endpoint> 
     <custom-transformer class="transformers.PaymentsTransformer" doc:name="Java"/> 
     <logger level="INFO" doc:name="Logger"/> 
    </batch:input> 
    <batch:process-records> 
     <batch:step name="Batch_Step"> 
      <expression-component doc:name="Expression"><![CDATA[payload[2].batchAmount='hghghfghfhgf']]></expression-component> 
      <batch:commit size="4" doc:name="Batch Commit"> 
        <transactional action="ALWAYS_BEGIN" doc:name="Transactional"> 
         <db:insert config-ref="Oracle_Configuration" bulkMode="true" doc:name="Database"> 
          <db:parameterized-query><![CDATA[INSERT INTO TIB_INT_AR_PAYMENT_IFACE (TRANSMISSION_REQUEST_ID,DESTINATION_ACCOUNT,ORIGINATION,TRANSMISSION_RECORD_COUNT,TRANSMISSION_AMOUNT,LOCKBOX_NUMBER,LOCKBOX_BATCH_COUNT,LOCKBOX_RECORD_COUNT,LOCKBOX_AMOUNT,BATCH_NAME,BATCH_AMOUNT,BATCH_RECORD_COUNT,ITEM_NUMBER,CURRENCY_CODE,REMITTANCE_AMOUNT,TRANSIT_ROUTING_NUMBER,ACCOUNT,CHECK_NUMBER,CUSTOMER_NUMBER,OVERFLOW_INDICATOR,OVERFLOW_SEQUENCE,INVOICE1,AMOUNT_APPLIED1) VALUES (#[payload.?transmissiosnRequestID],#[payload.?destinastionAccount],#[payload.?origination],#[payload.?transmissionSrecordCount],#[payload.?transmisssionAmount],#[payload.lockboxNumber],#[payload.lockboxBatchCount],#[payload.lockboxRecordCount],#[payload.lockboxAmount],#[payload.batchName],#[payload.batchAmount],#[payload.batchRecordCount],#[payload.itemNumber],#[payload.currencyCode],#[payload.remittanceAmount],#[payload.transitRoutingNumber],#[payload.account],#[payload.checkNumber],#[payload.customerNumber],#[payload.overflowIndicator],#[payload.overflowSequence],#[payload.invoice1],#[payload.amountApplied1])]]></db:parameterized-query> 
         </db:insert> 
        </transactional> 

      </batch:commit> 
     </batch:step> 
     <batch:step name="Batch_Step1" accept-policy="ONLY_FAILURES"> 
     <set-payload value="#[getStepExceptions()]" doc:name="Set Payload"/> 
      <foreach collection="#[payload.values()]" doc:name="For Each"> 
       <jms:outbound-endpoint queue="Invalid_Transmission" connector-ref="Active_MQ" doc:name="JMS"/> 
      </foreach> 
     </batch:step> 


    </batch:process-records> 
    <batch:on-complete> 
     <logger message="Completed the insert" level="INFO" doc:name="Logger"/> 
    </batch:on-complete> 
</batch:job> 
+0

感謝您的回覆。如果我將db連接器放入事務範圍內,並且如果發生故障,它會嘗試回滾並重試(再次將其發送到入站JMS),或者它將僅回滾並將其作爲失敗記錄發送到下一步。 – MRavindran