2013-01-15 40 views
18

我正在爲我的團隊的某個應用程序研究排隊解決方案。理想情況下,我們希望可以配置爲輕量級進程中代理(用於線程之間的低吞吐量消息傳遞)以及作爲外部代理。有沒有可以做到這一點的MQ服務器?大多數似乎需要設置爲外部實體。 ZeroMQ似乎最接近進程內解決方案,但似乎更像是「類固醇上的UDP套接字」,我們需要可靠的交付。是否有任何可以在Java進程中運行的MQ服務器?

+1

我認爲http://stackoverflow.com/questions/2507536/lightweight-jms-broker的答案包含有趣的信息(例如[ffmq](http://ffmq.sourceforge.net/index .html)建議)。 ActiveMQ雖然是另一個較重的候選人,但它也是可嵌入的。 – fvu

+1

就像@fvu所說的,ActiveMQ比ZeroMQ稍重,但它作爲一個嵌入式過程非常有效。如果你使用的是Spring,設置起來非常簡單。 –

+0

ZeroMQ在TCP(而不是UDP)之上運行,可以提供可靠的傳輸。但是,你是指一個持久隊列嗎?即支持光盤? –

回答

10

就像我們說的ActiveMQZeroMQ重一點,但它作爲一個嵌入式過程非常有效。 這裏有一個簡單的例子SpringActiveMQ

將用於所測試的隊列的消息偵聽器:

public class TestMessageListener implements MessageListener { 

    private static final Logger logger = LoggerFactory.getLogger(TestMessageListener.class); 

    @Override 
    public void onMessage(Message message) { 

     /* Receive the text message */ 
     if (message instanceof TextMessage) { 

      try { 
       String text = ((TextMessage) message).getText(); 
       System.out.println("Message reception from the JMS queue : " + text); 

      } catch (JMSException e) { 
       logger.error("Error : " + e.getMessage()); 
      } 

     } else { 
      /* Handle non text message */ 
     } 
    } 
} 

ActiveMQ上下文配置:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
         http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <bean id="jmsQueueConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> 
     <property name="brokerURL"> 
      <value>tcp://localhost:61617</value> 
     </property> 
    </bean> 

    <bean id="pooledJmsQueueConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 
     <constructor-arg ref="jmsQueueConnectionFactory" /> 
    </bean> 

    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> 
     <constructor-arg value="messageQueue" /> 
    </bean> 

    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> 
     <constructor-arg ref="pooledJmsQueueConnectionFactory" /> 
     <property name="pubSubDomain" value="false"/> 
    </bean> 

    <bean id="testMessageListener" class="com.example.jms.TestMessageListener" /> 

    <bean id="messageQueuelistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
     <property name="connectionFactory" ref="pooledJmsQueueConnectionFactory" /> 
     <property name="destination" ref="QueueDestination" /> 
     <property name="messageListener" ref="testMessageListener" /> 
     <property name="concurrentConsumers" value="5" /> 
     <property name="acceptMessagesWhileStopping" value="false" /> 
     <property name="recoveryInterval" value="10000" /> 
     <property name="cacheLevelName" value="CACHE_CONSUMER" /> 
    </bean> 

</beans> 

JUnit測試:

@ContextConfiguration(locations = {"classpath:/activeMQ-context.xml"}) 
public class SpringActiveMQTest extends AbstractJUnit4SpringContextTests { 

    @Autowired 
    private JmsTemplate template; 

    @Autowired 
    private ActiveMQDestination destination; 

    @Test 
    public void testJMSFactory() { 
     /* sending a message */ 
     template.convertAndSend(destination, "Hi"); 

     /* receiving a message */ 
     Object msg = template.receive(destination); 
     if (msg instanceof TextMessage) { 
      try { 
       System.out.println(((TextMessage) msg).getText()); 
      } catch (JMSException e) { 
       System.out.println("Error : " + e.getMessage()); 
      } 
     } 
    } 
} 

的依賴要添加到pom.xml

<!-- Spring --> 
<dependency> 
    <groupId>org.springframework</groupId> 
    <artifactId>spring-jms</artifactId> 
    <version>${org.springframework-version}</version> 
</dependency> 

<!-- ActiveMQ --> 
<dependency> 
    <groupId>org.apache.activemq</groupId> 
    <artifactId>activemq-all</artifactId> 
    <version>5.6.0</version> 
    <scope>compile</scope> 
</dependency> 

<dependency> 
    <groupId>org.apache.activemq</groupId> 
    <artifactId>activemq-pool</artifactId> 
    <version>5.6.0</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.activemq</groupId> 
    <artifactId>activemq-core</artifactId> 
    <version>5.6.0</version> 
</dependency> 
+0

這很好,謝謝。 –

+0

我們在ActiveMQ中看到的問題(以及爲什麼我現在正在快速搜索替代方法):(a)當您關閉它時,它不會停止它的所有線程。 (b)其許多API自動啓動施工服務,同時也有類似start()的方法,使得API不清楚並可能對(a)有所貢獻。 (c)多線程的許多問題,到目前爲止,它們已經通過添加更多同步塊來「修復」。 (d)可疑的安全性,特別是在發現模式下使用時。 (專業提示:在辦公室周圍設置一個流氓消息隊列服務器以享受娛樂時光!) – Trejkaz

2

WebSphere MQ客戶端有能力執行multicast pub/sub。這提供了繞過隊列管理器的客戶端到客戶端功能,但需要隊列管理器來建立連接。

相關問題