2017-07-29 412 views
0

您好,我正在使用wso2 esb並使用Active MQ進行消息隊列。活動MQ連接問題

我有一個簡單的服務來放置一個消息,其中它調用自定義java類,它創建一個tcp連接並將消息放入隊列中。

Java代碼看起來像下面

package in.esb.custommediators; 

import javax.jms.*; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 


import org.apache.synapse.ManagedLifecycle; 
import org.apache.synapse.MessageContext; 
import org.apache.synapse.core.SynapseEnvironment; 
import org.apache.synapse.mediators.AbstractMediator; 

import org.apache.synapse.core.axis2.Axis2MessageContext; 
import org.apache.synapse.transport.nhttp.NhttpConstants; 

import org.json.JSONObject; 
import org.json.XML; 

public class JMSStoreMediator extends AbstractMediator implements 
ManagedLifecycle { 

    Connection connection; 
    Session session; 

    public boolean mediate(MessageContext msgCtx) { 

     log.info("LogLocation = "+getClass().getName()+",ProxyName = "+msgCtx.getProperty("proxy.name")+ 
       ",Usercode = "+msgCtx.getProperty("usercode")+",Clientid = "+msgCtx.getProperty("clientid")+ 
       ",requestMsgId = "+msgCtx.getProperty("requestMsgId")+",Position = START"); 


     try { 
      boolean topic=false; 
      String jmsuri=""+msgCtx.getProperty("jmsuri"); 
      String t=""+msgCtx.getProperty("topic"); 
      if(t.isEmpty()){ 
       topic=false; 
      } 
      else { 
       topic=Boolean.valueOf(t); 
      } 
      ConnectionFactory factory= new ActiveMQConnectionFactory(jmsuri); 
      connection = factory.createConnection(); 
       connection.start(); 

      log.info("LogLocation = "+getClass().getName()+",JMS connection created :"+connection); 
      this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      Destination destination=null; 
      if(!topic)destination= session.createQueue(""+msgCtx.getProperty("jmsqueue")); 
      else destination= session.createTopic(""+msgCtx.getProperty("jmsqueue")); 
      MessageProducer producer = session.createProducer(destination); 
      producer.setDeliveryMode(DeliveryMode.PERSISTENT); 

      String xml = ""+msgCtx.getEnvelope().getBody().toStringWithConsume(); 

      if(topic){ 

       JSONObject obj=XML.toJSONObject(xml); 
       JSONObject ar=obj.getJSONObject("soapenv:Body"); 
       ar.remove("xmlns:soapenv"); 
       xml=ar.toString(); 
      } 
      TextMessage message = session.createTextMessage(xml); 
      producer.send(message); 


     } catch (Exception e) { 

      log.info("LogLocation = "+getClass().getName()+",Error in storing message in JMS stacktrace is :"+e.toString()+"message is :"+e.getMessage()); 
      e.printStackTrace(); 

      ((Axis2MessageContext) msgCtx).setProperty(NhttpConstants.HTTP_SC, 500); 
      handleException("Error while storing in the message store", msgCtx); 

     } 
     finally { 
      try { 
       session.close(); 
       if (connection!=null){ 
        log.info("LogLocation = "+getClass().getName()+",JMS connection closing :"+connection); 
        connection.close(); 
       } 

      } catch (JMSException e) { 
       log.info("LogLocation = "+getClass().getName()+",Error in closing JMS connection stacktrace is :"+e.toString()); 
       e.printStackTrace(); 
      } 
     } 

     return true; 
    } 

    @Override 
    public void destroy() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void init(SynapseEnvironment arg0) { 
     // TODO Auto-generated method stub 

    } 

} 

當我把這種服務,在隊列中發送消息下方得到生成的日誌。

[2017-07-29 11:18:35,962] INFO - JMSStoreMediator LogLocation = in.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-3:1,clientId=ID:my-desktop-36442-1501307315570-2:1,started=true} 

截至目前每一件事情是工作良好,但是當兩個用戶試圖在同一輪胎一些奇怪的事情發生提交信息如下圖所示

[2017-07-29 11:43:11,948] INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=false} 
[2017-07-29 11:43:11,963] INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=true} 

[2017-07-29 11:43:12,068] INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,Error in closing JMS connection stacktrace is :org.apache.activemq.ConnectionClosedException: The connection is already closed 

主動MQ是創建兩個連接,但使用一個連接對於這兩個調用以及一個連接在一個服務調用中關閉並且在另一個服務調用中拋出已經關閉的錯誤,另一個連接在活動狀態爲true的活動mq的連接列表中永遠等待,如下面的圖像,這也可以在ESB線程列表中看到。

enter image description here

這種連接堆積,造成掛起ESB服務器。即使我從Active MQ重置此連接ESB線程攜帶此連接信息,並且只有在重新啓動ESB後,問題纔會得到解決。

+0

哪裏是連接變量初始化?它看起來像是對另一個線程中可用的連接的引用 – simar

+0

這是自定義介體實現嗎?你能提供完整的課程代碼嗎? 這裏顯然有些多線程問題 – simar

+0

嗨@simar是的,我正在使用一個自定義中介,我已經編輯了我正在使用的完全自定義類的問題,它有一些多線程問題,正如你所說的。 – user4045063

回答

0

你看過文章Extending the Functionality of WSO2 Enterprise Service Bus - Part 1嗎?

重要部件是螺紋安全。它指出,每個中介,包括自定義,都是在傳入消息之間共享的。我建議類變量

Connection connection; 
Session session; 

移到方法公共布爾中介(MessageContext的msgCtx)因爲局部變量是線程安全的

public class JMSStoreMediator extends AbstractMediator implements 
ManagedLifecycle {  

    public boolean mediate(MessageContext msgCtx) { 
      Connection connection; 
      Session session; 
    .... 
    .... 
    rest the same 
+0

根據您的建議進行了更改並進行了檢查,但力度也有同樣的問題。我將閱讀文章。 – user4045063