1
我創建了一個java應用程序,它使用消息驅動bean(MDB)通過資源適配器activeMq 5.10在glassfish中從主題apache apollo獲取消息。當我使用apache ActiveMQ,它工作正常,但不適用於Apache阿波羅。我使用mqtt發送主題爲a和mqtt的消息來監聽該主題並獲取消息。但是,當我使用MDB進行收聽時,它無法收到任何消息。我看到apollo控制檯,標籤虛擬主機 - >主題。我單擊用於發送和接收消息的主題。該主題有消費者和生產者,「入隊:2項/2.72kb」,但消費者「轉賬= 0」。我不知道如何與Apollo進行MDB工作。使用消息驅動bean從主題獲取消息apache apollo
apollo.xml
<broker xmlns="http://activemq.apache.org/schema/activemq/apollo">
<notes>
The default configuration with tls/ssl enabled.
</notes>
<log_category console="console" `enter code here`security="security"connection="connection"audit="audit"/>
<authentication domain="apollo"/>
<!-- Give admins full access -->
<access_rule allow="admins" action="*"/>
<access_rule allow="*" action="*"/>
<virtual_host id="benchmark-broker">
<host_name>benchmark-broker</host_name>
<host_name>localhost</host_name>
<host_name>127.0.0.1</host_name>
<authentication enabled="false"/>
<topic slow_consumer_policy="block" >
<subscription tail_buffer="4k"/>
</topic>
</virtual_host>
<web_admin bind="http://0.0.0.0:9022"/>
<web_admin bind="https://127.0.0.1:61681"/>
<connector id="tcpMqtt" bind="tcp://0.0.0.0:7521" connection_limit="60000">
<mqtt max_message_length="104857600" />
</connector>
<connector id="tcpOpenwire" bind="tcp://0.0.0.0:9021" connection_limit="60000">
<openwire tight_encoding="false" tcp_no_delay="true"/>
</connector>
</broker>
代碼發佈消息:
String user = env("APOLLO_USER", "admin");
String password = env("APOLLO_PASSWORD", "password");
String host = env("APOLLO_HOST", "192.168.0.54");
int port = Integer.parseInt(env("APOLLO_PORT", "7521"));
final String destination = arg(args, 0, "cuongdm17");
int messages = 1;
String body = "test";
MQTT mqtt = new MQTT();
mqtt.setHost(host, port);
mqtt.setUserName(user);
mqtt.setPassword(password);
FutureConnection connection = mqtt.futureConnection();
connection.connect().await();
final LinkedList<Future<Void>> queue = new LinkedList<Future<Void>>();
UTF8Buffer topic = new UTF8Buffer(destination);
for (int i = 1; i <= messages; i++) {
queue.add(connection.publish(topic, msg, QoS.AT_LEAST_ONCE, false));
}
MDB:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "cuongdm17")
})
public class Cuongdm17 extends AmiAbstractMQTTProcessor implements MessageListener, Serializable {
static final Logger logger = Logger.getLogger(AmiMonitorProcessor.class.getName());
public Cuongdm17() {
}
@Override
protected Logger getLogger() {
return logger;
}
/**
* Handler MQTT (already transcript to JMS) messages from DCU Device
*
* @param message
*/
@Override
public void onMessage(Message message) {
try {
info("cuongdm17");
} catch (Exception e) {
error("Error when processing onMessage, error message=" + e.getMessage() + ", message trace=" + message, e);
}
}
@Override
protected void receiveMessage(MQTTPayloadData payload) throws Exception {
info("cuongdm17");
}
@Override
public String toString() {
return "Current-Info Processor";
}
}
我明白了,當我嘗試使用ActiveMQConnectionFactory創建消費者和製作者時,像apollo的例子,MDB可以獲取消息。 MQTT可能不適用於apollo中的MDB,或者我的代碼有問題。 –
MDB使用OpenWire協議 - 即使用JMS接口/ Sematics。所以,不可以在Apollo中使用MQTT消息 –