2015-06-13 97 views
1

我創建了一個java應用程序,它使用消息驅動bean(MDB)通過資源適配器activeMq 5.10在glassfish中從主題apache apollo獲取消息。當我使用apache ActiveMQ,它工作正常,但不適用於Apache阿波羅。我使用mqtt發送主題爲a和mqtt的消息來監聽該主題並獲取消息。但是,當我使用MDB進行收聽時,它無法收到任何消息。我看到apollo控制檯,標籤虛擬主機 - >主題。我單擊用於發送和接收消息的主題。該主題有消費者和生產者,「入隊:2項/2.72kb」,但消費者「轉賬= 0」。我不知道如何與Apollo進行MDB工作。使用消息驅動bean從主題獲取消息apache apollo

apollo.xml

<broker xmlns="http://activemq.apache.org/schema/activemq/apollo"> 
    <notes> 
     The default configuration with tls/ssl enabled. 
    </notes> 
    <log_category console="console" `enter code here`security="security"connection="connection"audit="audit"/> 
    <authentication domain="apollo"/> 
    <!-- Give admins full access --> 
    <access_rule allow="admins" action="*"/> 
    <access_rule allow="*" action="*"/> 
    <virtual_host id="benchmark-broker"> 
    <host_name>benchmark-broker</host_name> 
    <host_name>localhost</host_name> 
    <host_name>127.0.0.1</host_name> 
    <authentication enabled="false"/> 
    <topic slow_consumer_policy="block" > 
     <subscription tail_buffer="4k"/> 
    </topic> 
    </virtual_host> 
    <web_admin bind="http://0.0.0.0:9022"/> 
    <web_admin bind="https://127.0.0.1:61681"/> 
    <connector id="tcpMqtt" bind="tcp://0.0.0.0:7521" connection_limit="60000"> 
     <mqtt max_message_length="104857600" /> 
    </connector> 
<connector id="tcpOpenwire" bind="tcp://0.0.0.0:9021" connection_limit="60000"> 
    <openwire tight_encoding="false" tcp_no_delay="true"/> 
</connector> 
</broker> 

代碼發佈消息:

String user = env("APOLLO_USER", "admin"); 
    String password = env("APOLLO_PASSWORD", "password"); 
    String host = env("APOLLO_HOST", "192.168.0.54"); 
    int port = Integer.parseInt(env("APOLLO_PORT", "7521")); 
    final String destination = arg(args, 0, "cuongdm17"); 
    int messages = 1; 
    String body = "test"; 

    MQTT mqtt = new MQTT(); 
    mqtt.setHost(host, port); 
    mqtt.setUserName(user); 
    mqtt.setPassword(password); 

    FutureConnection connection = mqtt.futureConnection(); 
    connection.connect().await(); 
    final LinkedList<Future<Void>> queue = new LinkedList<Future<Void>>(); 
    UTF8Buffer topic = new UTF8Buffer(destination); 
    for (int i = 1; i <= messages; i++) { 
     queue.add(connection.publish(topic, msg, QoS.AT_LEAST_ONCE, false)); 
    } 

MDB:

@MessageDriven(activationConfig = { 
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), 
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), 
@ActivationConfigProperty(propertyName = "destination", propertyValue = "cuongdm17") 
}) 
public class Cuongdm17 extends AmiAbstractMQTTProcessor implements MessageListener, Serializable { 

static final Logger logger = Logger.getLogger(AmiMonitorProcessor.class.getName()); 

public Cuongdm17() { 
} 

@Override 
protected Logger getLogger() { 
    return logger; 
} 

/** 
* Handler MQTT (already transcript to JMS) messages from DCU Device 
* 
* @param message 
*/ 
@Override 
public void onMessage(Message message) { 
    try { 
     info("cuongdm17"); 
    } catch (Exception e) { 
     error("Error when processing onMessage, error message=" + e.getMessage() + ", message trace=" + message, e); 
    } 
} 

@Override 
protected void receiveMessage(MQTTPayloadData payload) throws Exception { 
    info("cuongdm17"); 
} 

@Override 
public String toString() { 
    return "Current-Info Processor"; 
} 
} 

回答

0

即使阿波羅有着極大的現代化的核心,它不具有的所有功能的ActiveMQ。雖然ActiveMQ支持生產者/消費者之間的混合線協議 - Apollo則不支持。如果切換到Apollo,那麼您也可以切換到使用MQTT進行消費。

+0

我明白了,當我嘗試使用ActiveMQConnectionFactory創建消費者和製作者時,像apollo的例子,MDB可以獲取消息。 MQTT可能不適用於apollo中的MDB,或者我的代碼有問題。 –

+0

MDB使用OpenWire協議 - 即使用JMS接口/ Sematics。所以,不可以在Apollo中使用MQTT消息 –