是否存在使用ActiveMQ和JMS(更確切地說,使用JmsTemplate)和Spring框架的生產者的重試機制/解決方案的機制或示例實現?
我想要處理的用例是,當代理不可用時,例如,我想進行一些重試次數,最多6次(如果可能的話,每次之間有指數延遲)。所以,我還需要跟蹤每次嘗試之間的消息重試次數。 我知道爲消費者重新傳遞的政策,但我也想實現一個可靠的生產者的客戶端以及使用JMS和Spring框架的ActiveMQ的生產者客戶端的重試機制
感謝, 西麥
是否存在使用ActiveMQ和JMS(更確切地說,使用JmsTemplate)和Spring框架的生產者的重試機制/解決方案的機制或示例實現?
我想要處理的用例是,當代理不可用時,例如,我想進行一些重試次數,最多6次(如果可能的話,每次之間有指數延遲)。所以,我還需要跟蹤每次嘗試之間的消息重試次數。 我知道爲消費者重新傳遞的政策,但我也想實現一個可靠的生產者的客戶端以及使用JMS和Spring框架的ActiveMQ的生產者客戶端的重試機制
感謝, 西麥
我認爲最簡單的方法是通過使用啓用了持久性的嵌入式代理來使用存在的代理,生產者必須使用它來發送消息,並創建駱駝路由以從本地隊列讀取並轉發到遠程的一個或通過使用JmsBridgeConnector
或NetworkConnector
堅果我認爲JmsBridgeConnector
更容易。 這裏是一個春天的代碼示例: 生產者必須使用jmsConnectionFactory()
創建ConnectionFactory
package com.example.amq;
import java.io.File;
import javax.jms.ConnectionFactory;
import javax.jms.QueueConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.jms.JmsConnector;
import org.apache.activemq.network.jms.OutboundQueueBridge;
import org.apache.activemq.network.jms.ReconnectionPolicy;
import org.apache.activemq.network.jms.SimpleJmsQueueConnector;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ActiveMQConfiguration {
public static final String DESTINATION_NAME = "localQ";
@Bean // (initMethod = "start", destroyMethod = "stop")
public BrokerService broker() throws Exception {
final BrokerService broker = new BrokerService();
broker.addConnector("vm://localhost");
SimpleJmsQueueConnector simpleJmsQueueConnector = new SimpleJmsQueueConnector();
OutboundQueueBridge bridge = new OutboundQueueBridge();
bridge.setLocalQueueName(DESTINATION_NAME);
bridge.setOutboundQueueName("remoteQ");
OutboundQueueBridge[] outboundQueueBridges = new OutboundQueueBridge[] { bridge };
simpleJmsQueueConnector.getReconnectionPolicy().setMaxSendRetries(ReconnectionPolicy.INFINITE);
simpleJmsQueueConnector.setOutboundQueueBridges(outboundQueueBridges);
simpleJmsQueueConnector.setLocalQueueConnectionFactory((QueueConnectionFactory) jmsConnectionFactory());
simpleJmsQueueConnector.setOutboundQueueConnectionFactory(outboundQueueConnectionFactory());
JmsConnector[] jmsConnectors = new JmsConnector[] { simpleJmsQueueConnector };
broker.setJmsBridgeConnectors(jmsConnectors);
PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
File dir = new File(System.getProperty("user.home") + File.separator + "kaha");
if (!dir.exists()) {
dir.mkdirs();
}
persistenceAdapter.setDirectory(dir);
broker.setPersistenceAdapter(persistenceAdapter);
broker.setPersistent(true);
broker.setUseShutdownHook(false);
broker.setUseJmx(true);
return broker;
}
@Bean
public QueueConnectionFactory outboundQueueConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"auto://localhost:5671");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
return connectionFactory;
}
@Bean
public ConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
connectionFactory.setObjectMessageSerializationDefered(true);
connectionFactory.setCopyMessageOnSend(false);
return connectionFactory;
}
}
使用駱駝:
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.activemq.camel.component.ActiveMQConfiguration;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class ActiveMQCamelBridge {
public static void main(String args[]) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addComponent("inboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:61616"));
ActiveMQComponent answer = ActiveMQComponent.activeMQComponent("tcp://localhost:5671");
if (answer.getConfiguration() instanceof ActiveMQConfiguration) {
((ActiveMQConfiguration) answer.getConfiguration()).setUserName("admin");
((ActiveMQConfiguration) answer.getConfiguration()).setPassword("admin");
}
context.addComponent("outboundQueue", answer);
context.addRoutes(new RouteBuilder() {
public void configure() {
from("inboundQueue:queue:localQ").to("outboundQueue:queue:remoteQ");
}
});
context.start();
Thread.sleep(60 * 5 * 1000);
context.stop();
}
}
感謝您的回答。從我看到的想法是有一個本地代理,所有消息都被保存在一個文件中,然後通過JmsBridgeConnector轉發給遠程代理。將所有消息寫入文件,並不反映性能太多?此外,如果遠程代理已關閉,我目前是否瞭解在停機期間發送給本地代理的所有消息都會留在那裏(在本地代理中),直到遠程代理再次啓動並且所有消息都被轉發爲止? –
如果遠程代理已關閉,我當前是否瞭解在停機期間發送給本地代理的所有消息都會留在那裏(在本地代理中),直到遠程代理再次啓動並且所有要轉發的消息==>是的 –
寫入所有消息到文件,不反映到性能太多? ==>消息在KahaDBStore –
生產者不提供任何形式的重試機制,消費者喜歡。您需要在代碼中確認由生產商通過代理確認發送的消息。
感謝您的評論,你有一個示例解決方案或製造商的重試機制的想法? –
本文檔可能幫助你一樣。 http://activemq.apache.org/failover-transport-reference.html – Nir
謝謝,故障轉移傳輸引用是拓撲解決方案的一個很好的例子,它確實提供了一個很好的插件來實現可讀性,但我也想保護消息不被丟失,並確保它們將交付給經紀人,同樣在沒有任何經紀人可用的情況下。我的想法是,當我有這種情況時,將所有消息保存在數據庫中,並且代理的連接返回時發送所有持久消息。我想通過重試機制捕獲消費者自己的任何經紀人 –
實際上,與TransportListener結合使用故障轉移傳輸也提供了一個很好的解決方案。感謝您的鏈接 –