,我使用wso2esb對它進行了配置以進行消息處理。使用Active MQ 5.10版本的活動MQ tcp連接失敗
大約7-10天后,活動MQ正在拋出tcp連接失敗異常,因爲ESB未收到成功的tcp連接,因此未能在隊列中提交消息。
在這種情況下,我重新啓動服務器,並再次運行7-10天,同樣的事情重複。
我的問題是
什麼可能是積極的MQ的確切原因停止給成功的TCP連接..?
爲什麼重啓服務器後它恢復正常狀態..?
有沒有最好的解決方案,以在來這個問題..
的內存配置,activemq.xml中文件
<systemUsage>
<systemUsage sendFailIfNoSpace="true">
<memoryUsage>
<memoryUsage limit="1430 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="300 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
這裏是一個被放置在消息隊列中的代理服務。在代理服務,它會檢查用戶認證是否是真的,用戶可以在隊列中的位置信息,在這裏我使用的是類中介那裏,因爲它與主動MQ連接,並把消息
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="JmsStore2.0"
transports="https http"
startOnLoad="true"
trace="disable"
statistics="enable">
<description/>
<target>
<inSequence onError="fault">
<property name="messageType" value="application/json" scope="axis2"/>
<property name="FORCE_ERROR_ON_SOAP_FAULT" value="true"/>
<property name="jmsuri" value="tcp://0.0.0.0:61616"/>
<property name="jmsqueue" expression="get-property('transport', 'jmsqueue')"/>
<property name="readingspayload" expression="$body" type="OM"/>
<property name="username" expression="get-property('transport', 'username')"/>
<property name="password" expression="get-property('transport', 'password')"/>
<property name="PartyBranchID"
expression="//FieldValue/text()"
scope="default"
type="STRING"/>
<property name="Body" expression="$body" scope="default" type="STRING"/>
<property name="usercode"
expression="fn:substring-before(get-property('username'),'|')"
scope="default"
type="STRING"/>
<property name="clientid"
expression="fn:substring-after(get-property('username'),'|')"
scope="default"
type="STRING"/>
<property name="requestMsgId"
expression="get-property('MessageID')"
scope="default"
type="STRING"/>
<property name="client_ip_address"
expression="get-property('axis2','REMOTE_ADDR')"
scope="default"
type="STRING"/>
<payloadFactory media-type="xml">
<format>
<send xmlns="">
<username>$1</username>
<password>$2</password>
</send>
</format>
<args>
<arg evaluator="xml" expression="get-property('username')"/>
<arg evaluator="xml" expression="get-property('password')"/>
</args>
</payloadFactory>
<send receive="JmsStore_Seq">
<endpoint>
<address uri="http://localhost:8282/services/Login2.0" format="soap11">
<suspendOnFailure>
<errorCodes>101500,101501,101506,101507,101508,101503,50000</errorCodes>
<initialDuration>30</initialDuration>
<progressionFactor>1.0</progressionFactor>
<maximumDuration>300</maximumDuration>
</suspendOnFailure>
</address>
</endpoint>
</send>
</inSequence>
<outSequence onError="fault">
<send/>
</outSequence>
</target>
</proxy>
序列:
<sequence xmlns="http://ws.apache.org/ns/synapse"
name="JmsStore_Seq"
trace="disable">
<property name="FORCE_ERROR_ON_SOAP_FAULT" value="true"/>
<property xmlns:ns="http://org.apache.synapse/xsd"
name="Authentication"
expression="//Authentication/text()"/>
<property xmlns:ns="http://org.apache.synapse/xsd"
name="UserId"
expression="//UserId/text()"
scope="default"
type="STRING"/>
<property xmlns:ns="http://org.apache.synapse/xsd"
name="WorkOUid"
expression="//WorkOUid/text()"/>
<property xmlns:ns="http://org.apache.synapse/xsd"
name="WorkPartyBranchId"
expression="//WorkPartyBranchId/text()"/>
<filter xmlns:ns="http://org.apache.synapse/xsd"
xpath="get-property('Authentication')=''">
<then>
<payloadFactory media-type="xml">
<format>
<ResponseJSON xmlns="">
<Exception>Service trying to connect inactive service</Exception>
<Status>101503</Status>
</ResponseJSON>
</format>
<args/>
</payloadFactory>
<property name="messageType" value="application/json" scope="axis2"/>
<property name="HTTP_METHOD" value="POST" scope="axis2" type="STRING"/>
<property name="RESPONSE" value="true" scope="default" type="STRING"/>
<property name="NO_ENTITY_BODY" scope="axis2" action="remove"/>
<send/>
</then>
<else>
<filter xpath="get-property('Authentication')='false'">
<then>
<payloadFactory media-type="xml">
<format>
<ResponseJSON xmlns="">
<Exception>Authentication Failed</Exception>
<Status>401</Status>
</ResponseJSON>
</format>
<args/>
</payloadFactory>
<property name="messageType" value="application/json" scope="axis2"/>
<property name="HTTP_METHOD" value="POST" scope="axis2" type="STRING"/>
<property name="RESPONSE" value="true" scope="default" type="STRING"/>
<property name="NO_ENTITY_BODY" scope="axis2" action="remove"/>
<send/>
</then>
<else>
<property name="jmspayload"
expression="get-property('readingspayload')"
type="OM"/>
<property name="ResponseJSON" expression="$body/ResponseJSON" type="OM"/>
<property name="jmsuri" expression="get-property('jmsuri')"/>
<property name="jmsqueue" expression="get-property('jmsqueue')"/>
<payloadFactory media-type="xml">
<format>
<PLData>
<JMpayload>$1</JMpayload>
<AuthData>$2</AuthData>
<LogData>
<usercode>$3</usercode>
<clientid>$4</clientid>
<requestMsgId>$5</requestMsgId>
</LogData>
</PLData>
</format>
<args>
<arg evaluator="xml" expression="get-property('jmspayload')"/>
<arg evaluator="xml" expression="get-property('ResponseJSON')"/>
<arg evaluator="xml" expression="get-property('usercode')"/>
<arg evaluator="xml" expression="get-property('clientid')"/>
<arg evaluator="xml" expression="get-property('requestMsgId')"/>
</args>
</payloadFactory>
<class name="in.youtility.esb.custommediators.JMSStoreMediator"/>
<payloadFactory media-type="xml">
<format>
<ResponseJSON xmlns="">
<Body>
<Datalist>
<Data>Successfully stored</Data>
</Datalist>
</Body>
<Status>200</Status>
</ResponseJSON>
</format>
<args/>
</payloadFactory>
<property name="messageType" value="application/json" scope="axis2"/>
<header name="To" action="remove"/>
<property name="NO_ENTITY_BODY" scope="axis2" action="remove"/>
<property name="RESPONSE" value="true"/>
<send/>
</else>
</filter>
</else>
</filter>
<description/>
</sequence>
類中保:
public class JMSStoreMediator extends AbstractMediator implements
ManagedLifecycle {
Connection connection;
public boolean mediate(MessageContext msgCtx) {
try {
boolean topic=false;
String jmsuri=""+msgCtx.getProperty("jmsuri");
String t=""+msgCtx.getProperty("topic");
if(t.isEmpty()){
topic=false;
}
else {
topic=Boolean.valueOf(t);
}
ConnectionFactory factory= new ActiveMQConnectionFactory(jmsuri);
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination=null;
if(!topic)destination= session.createQueue(""+msgCtx.getProperty("jmsqueue"));
else destination= session.createTopic(""+msgCtx.getProperty("jmsqueue"));
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
String xml = ""+msgCtx.getEnvelope().getBody().toStringWithConsume();
if(topic){
JSONObject obj=XML.toJSONObject(xml);
JSONObject ar=obj.getJSONObject("soapenv:Body");
ar.remove("xmlns:soapenv");
xml=ar.toString();
}
TextMessage message = session.createTextMessage(xml);
producer.send(message);
} catch (Exception e) {
log.info("LogLocation = "+getClass().getName()+",Error in storing message in JMS stacktrace is :"+e.toString());
((Axis2MessageContext) msgCtx).setProperty(NhttpConstants.HTTP_SC, 500);
handleException("Error while storing in the message store", msgCtx);
}
finally {
try {
connection.close();
} catch (JMSException e) {
log.info("LogLocation = "+getClass().getName()+",Error in closing JMS connection stacktrace is :"+e.toString());
}
}
log.info("LogLocation = "+getClass().getName()+",ProxyName = "+msgCtx.getProperty("proxy.name")+
",Usercode = "+msgCtx.getProperty("usercode")+",Clientid = "+msgCtx.getProperty("clientid")+
",requestMsgId = "+msgCtx.getProperty("requestMsgId")+",Position = END");
return true;
}
ActiveMQ中的消息數量日增?您如何使用AMQ,使用JMS代理服務和MessageStore? –
嗨@ Jean-Michel感謝您的回覆,我正在使用ESB代理服務,它將在隊列中放置消息,並使用其他JMS代理服務來監聽此隊列並處理它。經常有消息進入隊列,無法處理任何問題,直到Active MQ成功連接到ESB。 – user4045063
我發佈了第一個答案,但告訴我在ActiveMQ中,如果發生故障(在隊列中等待數千條消息),消息的數量是否增加 –