1
我開發了JMS應用程序,現在我想添加將支持代理重啓的功能。我有動態的話題,我應該在我的連接恢復後重新創建它們。此外,我應該有可能知道經紀商何時倒閉以及何時倒閉。 所以我嘗試使用ActiveMQ故障轉移協議實現此功能。我實現TransportListener和方法「transportInterrupted」我叫完全斷開像Transport Listener和ActiveMq重啓
public void disconnect() throws JMSException {
System.out.println("!!!!!!!DISCONNECTING!!!!!!!!");
consumer.close();
session.close();
session = null;
messageProducer.close();
messageProducer = null;
connection = null;
connected = false;
System.out.println("!!!!!!!DISCONNECTED!!!!!!!!");
}
這是我的應用程序掛起後,它就像競爭條件。如果我關閉()只有生產者和連接設置爲null一切正常,但如果我嘗試關閉消費者,它只能從N個案例中工作。我寫測試證明它。我認爲關閉消費者的問題,但我沒有找到任何信息,我做錯了什麼。
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class FastFailProducer {
volatile boolean connected = false;
private ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?timeout=5000");;
private static FailoverListener failoverListener;
private Connection connection;
private Session session;
private Queue queue;
private MessageProducer messageProducer;
private MessageConsumer consumer;
private String something;
public void init() throws JMSException {
System.out.println("!!!!!!!CONNECTING!!!!!!!!");
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
((ActiveMQConnection) connection).addTransportListener(failoverListener);
queue = session.createQueue("TEST");
messageProducer = session.createProducer(queue);
consumer = session.createConsumer(queue);
System.out.println("!!!!!!!CONNECTING COMPLETE!!!!!!!!");
connected = true;
}
public void disconnect() throws JMSException {
System.out.println("!!!!!!!DISCONNECTING!!!!!!!!");
consumer.close();
session.close();
session = null;
messageProducer.close();
messageProducer = null;
connection = null;
connected = false;
System.out.println("!!!!!!!DISCONNECTED!!!!!!!!");
}
public void run() throws Exception {
// send messages
for (int i = 0; i < 1000; i++) {
if (connected) {
if (session != null & messageProducer != null & queue != null) {
// send a message
messageProducer.send(session.createTextMessage(i + " message"));
System.out.println("Sent message " + i);
}
} else {
// execute your backup logic
System.out.println("Message " + i + " not sent");
}
Thread.sleep(1000);
}
messageProducer.close();
session.close();
connection.close();
System.exit(0);
}
public static void main(String[] args) throws Exception {
FastFailProducer failoverProducer = new FastFailProducer();
failoverProducer.something = "asdfasdf";
failoverListener = new FailoverListener(failoverProducer);
failoverProducer.init();
failoverProducer.setConnected(true);
failoverProducer.run();
}
public boolean isConnected() {
return connected;
}
public void setConnected(boolean connected) {
this.connected = connected;
}
}
TransportListenerImpl類
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.transport.TransportListener;
public class FailoverListener implements TransportListener {
private FastFailProducer failProducer;
public FailoverListener(FastFailProducer failProducer) {
super();
this.failProducer = failProducer;
}
@Override
public void onCommand(Object arg0) {
}
@Override
public void onException(IOException arg0) {
}
@Override
public void transportInterupted() {
try {
failProducer.disconnect();
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void transportResumed() {
System.out.println("!!!!!!!TRANSPORT RESUMED!!!!!!!!");
if (!failProducer.isConnected()) {
try {
failProducer.init();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
哦,謝謝。有時候我覺得這樣實現這個功能。但是,這是否意味着我應該自己實施重新連接邏輯?我將在我的連接上添加一些ExceptionListener,並且當我收到某種IOException或JMSException時,我必須嘗試重新連接到代理? 爲什麼這個示例掛起試圖關閉消費者或會話非常有趣。我沒有關閉連接,我只是關閉消費者/生產者和會議...... –
事情可以掛在這裏,因爲運輸發射事件是在重新連接邏輯中,並持有可能導致死鎖的內部鎖,如果你做了類似關閉消費者。 TransportListener只是爲您的應用程序提供通知機制,您的控制邏輯應該從其他某個線程觸發。從我最初發布的信息中可以得知,當代理髮生故障時,您應該做任何工作,故障轉移傳輸將重新建立客戶端連接和資源,就像代理商崩潰時一樣。 –