2013-01-05 56 views
1

我開發了JMS應用程序,現在我想添加將支持代理重啓的功能。我有動態的話題,我應該在我的連接恢復後重新創建它們。此外,我應該有可能知道經紀商何時倒閉以及何時倒閉。 所以我嘗試使用ActiveMQ故障轉移協議實現此功能。我實現TransportListener和方法「transportInterrupted」我叫完全斷開像Transport Listener和ActiveMq重啓

public void disconnect() throws JMSException { 
    System.out.println("!!!!!!!DISCONNECTING!!!!!!!!"); 
    consumer.close(); 
    session.close(); 
    session = null; 
    messageProducer.close(); 
    messageProducer = null; 
    connection = null; 
    connected = false; 
    System.out.println("!!!!!!!DISCONNECTED!!!!!!!!"); 
} 

這是我的應用程序掛起後,它就像競爭條件。如果我關閉()只有生產者和連接設置爲null一切正常,但如果我嘗試關閉消費者,它只能從N個案例中工作。我寫測試證明它。我認爲關閉消費者的問題,但我沒有找到任何信息,我做錯了什麼。

import javax.jms.Connection; 
import javax.jms.JMSException; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageProducer; 
import javax.jms.Queue; 
import javax.jms.Session; 

import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory; 
public class FastFailProducer { 
    volatile boolean connected = false; 
    private ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?timeout=5000");; 
    private static FailoverListener failoverListener; 
    private Connection connection; 
    private Session session; 
    private Queue queue; 
    private MessageProducer messageProducer; 
    private MessageConsumer consumer; 
    private String something; 

public void init() throws JMSException { 
    System.out.println("!!!!!!!CONNECTING!!!!!!!!"); 
    connection = factory.createConnection(); 
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    connection.start(); 
    ((ActiveMQConnection) connection).addTransportListener(failoverListener); 
    queue = session.createQueue("TEST"); 
    messageProducer = session.createProducer(queue); 
    consumer = session.createConsumer(queue); 
    System.out.println("!!!!!!!CONNECTING COMPLETE!!!!!!!!"); 
    connected = true; 
} 

public void disconnect() throws JMSException { 
    System.out.println("!!!!!!!DISCONNECTING!!!!!!!!"); 
    consumer.close(); 
    session.close(); 
    session = null; 
    messageProducer.close(); 
    messageProducer = null; 
    connection = null; 
    connected = false; 
    System.out.println("!!!!!!!DISCONNECTED!!!!!!!!"); 
} 

public void run() throws Exception { 
    // send messages 
    for (int i = 0; i < 1000; i++) { 
     if (connected) { 
      if (session != null & messageProducer != null & queue != null) { 
       // send a message 
       messageProducer.send(session.createTextMessage(i + " message")); 
       System.out.println("Sent message " + i); 
      } 
     } else { 
      // execute your backup logic 
      System.out.println("Message " + i + " not sent"); 

     } 
     Thread.sleep(1000); 
    } 

    messageProducer.close(); 
    session.close(); 
    connection.close(); 
    System.exit(0); 
} 

public static void main(String[] args) throws Exception { 
    FastFailProducer failoverProducer = new FastFailProducer(); 
    failoverProducer.something = "asdfasdf"; 
    failoverListener = new FailoverListener(failoverProducer); 
    failoverProducer.init(); 
    failoverProducer.setConnected(true); 
    failoverProducer.run(); 

} 

public boolean isConnected() { 
    return connected; 
} 

public void setConnected(boolean connected) { 
    this.connected = connected; 
} 
} 

TransportListenerImpl類

import java.io.IOException; 

import javax.jms.JMSException; 

import org.apache.activemq.transport.TransportListener; 

public class FailoverListener implements TransportListener { 
    private FastFailProducer failProducer; 

public FailoverListener(FastFailProducer failProducer) { 
    super(); 
    this.failProducer = failProducer; 
} 

@Override 
public void onCommand(Object arg0) { 
} 

@Override 
public void onException(IOException arg0) { 

} 

@Override 
public void transportInterupted() { 
    try { 
     failProducer.disconnect(); 
    } catch (JMSException e) { 
     e.printStackTrace(); 
    } 
} 

@Override 
public void transportResumed() { 
    System.out.println("!!!!!!!TRANSPORT RESUMED!!!!!!!!"); 
    if (!failProducer.isConnected()) { 
     try { 
      failProducer.init(); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 
} 
} 

回答

2

我認爲你缺少使用故障切換協議的地步。如果您使用故障轉移,則無需關閉連接及其關聯資源,因爲故障轉移傳輸將負責恢復客戶端上的所有內容,就像在代理斷開之前那樣。在事件方法中關閉連接肯定會鎖定,因爲您不希望這樣做。如果您希望在代理程序消失時關閉所有內容,請不要使用故障轉移,而應該監聽JMS異常偵聽器事件掛接。

+0

哦,謝謝。有時候我覺得這樣實現這個功能。但是,這是否意味着我應該自己實施重新連接邏輯?我將在我的連接上添加一些ExceptionListener,並且當我收到某種IOException或JMSException時,我必須嘗試重新連接到代理? 爲什麼這個示例掛起試圖關閉消費者或會話非常有趣。我沒有關閉連接,我只是關閉消費者/生產者和會議...... –

+0

事情可以掛在這裏,因爲運輸發射事件是在重新連接邏輯中,並持有可能導致死鎖的內部鎖,如果你做了類似關閉消費者。 TransportListener只是爲您的應用程序提供通知機制,您的控制邏輯應該從其他某個線程觸發。從我最初發布的信息中可以得知,當代理髮生故障時,您應該做任何工作,故障轉移傳輸將重新建立客戶端連接和資源,就像代理商崩潰時一樣。 –