2015-10-26 123 views
1

我正在運行Apache Artemis經紀人版本1.1.0。Apache Artemis經紀人不發送消息給SwiftMQ AMQP客戶端

我使用這樣的SwiftMQ客戶端創建一個會話。

AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT); 
Connection connection = new Connection(ctx, getBrokerHost(), getBrokerPort(), getUserName(), getPassword()); 
connection.setExceptionListener((exc) -> log().error(exc, "Problem with AMQP connection to {}", getBrokerHost())); 
connection.connect(); 

// Parameters are maximum number of unsettled input and output messages 
session = connection.createSession(50, 50); 

在一個過程中,我創建由

session.createProducer(queueName, QoS.AT_LEAST_ONCE) 

生產者在單獨的過程中,我創建一個消費者通過

session.createProducer(queueName, QoS.AT_LEAST_ONCE) 

隨着生產者,我可以由

發送消息
AMQPMessage result = new AMQPMessage(); 
result.setAmqpValue(new AmqpValue(new AMQPString(value))); 
producer.send(result); 

This works。使用JMX,我可以看到隊列中有消息。當我嘗試通過

AMQPMessage message = consumer.receive(); 

AMQPMessage message = consumer.receiveNoWait(listener); 

讀他們在那裏listener將消息發送到讀線程調用receive S的一個,電話要麼塊或沒有返回信息,根據在版本上。

JMX顯示代理認爲隊列中有消費者,隊列中有消息。消息未在交付時列出,並且隊列未暫停。隊列中沒有過濾器。

如果重要,消費者和生產者在同一主機上運行。該代理正在與生產者和使用者相同的主機上運行於Docker容器中(假設您不知道Docker,請考慮VM),但它具有不同的IP地址。

我加了consumer.setLinkCredit(100),但是這並沒有影響任何東西。

可能會有一些Artemis/SwiftMQ不兼容,但這似乎不太可能,因爲我可以發送消息給代理。

經紀人幾乎是一個標準的經紀人。它創建於

/opt/apache-artemis-1.1.0/bin/artemis create artemis \ 
             --home /opt/apache-artemis \ 
             --user xxx \ 
             --password yyy \ 
             --cluster-user www \ 
             --cluster-password zzz \ 
             --allow-anonymous 

第二個用戶,角色amq已被添加。預定義了兩個持久隊列。其中之一就是這裏使用的隊列。

是否有可能無法傳遞消息的原因?還有什麼我可以有用地檢查,以瞭解爲什麼這不起作用?

編輯

我試圖與幾個不同的隊列名稱。

  • frobGetter
  • frobRequest
  • jms.queue.ExpiryQueue
  • jms.queue.DLQ
  • jms.queue。PlanningQueue

這些名字都不起作用。

+0

你有什麼作爲queueName? –

回答

0

artemis隊列上有一個前綴。通常你必須指定名稱爲jms.queue.QueueName ...

我最初在我們調查什麼是你的問題時添加了這個答案。 (我與Tim Bish合作,他給出了另一個答案)。最後情況並非如此,但我希望在這裏保留這個答案,因爲最終某些用戶可能會使用谷歌搜索。

1

我以前從未使用過SwiftMQ客戶端,但我可以使用下面顯示的代碼使用Artemis v1.1.0和ActiveMQ v5.12.1。竅門似乎是,您需要在createConsumer調用中指定鏈接功勞,consumer.setLinkCredit(100)似乎對消費者沒有任何影響,因爲在進行此調用時沒有向代理髮出信用。

package org.apache.activemq.demo; 

import java.util.concurrent.TimeUnit; 

import com.swiftmq.amqp.AMQPContext; 
import com.swiftmq.amqp.v100.client.Connection; 
import com.swiftmq.amqp.v100.client.Consumer; 
import com.swiftmq.amqp.v100.client.ExceptionListener; 
import com.swiftmq.amqp.v100.client.Producer; 
import com.swiftmq.amqp.v100.client.QoS; 
import com.swiftmq.amqp.v100.client.Session; 
import com.swiftmq.amqp.v100.generated.messaging.message_format.AmqpValue; 
import com.swiftmq.amqp.v100.messaging.AMQPMessage; 
import com.swiftmq.amqp.v100.types.AMQPString; 
import com.swiftmq.amqp.v100.types.AMQPType; 

public class SwitftMQTest { 

    public void run() throws Exception { 
     AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT); 
     Connection connection = new Connection(ctx, "localhost", 5672, "guest", "guest"); 
     connection.setExceptionListener(new ExceptionListener() { 

      @Override 
      public void onException(Exception error) { 
       System.out.println("Problem with AMQP connection: " + error.getMessage()); 
      } 
     }); 

     connection.connect(); 

     // Parameters are maximum number of unsettled input and output messages 
     Session session = connection.createSession(50, 50); 

     Consumer consumer = session.createConsumer("jms.queue.TEST", 100, QoS.AT_LEAST_ONCE, false, null); 

     Producer producer = session.createProducer("jms.queue.TEST", QoS.AT_LEAST_ONCE); 
     AMQPMessage message = new AMQPMessage(); 
     message.setAmqpValue(new AmqpValue(new AMQPString("Test String"))); 

     producer.send(message); 
     producer.close(); 

     System.out.println("Attempting to read a message."); 
     message = consumer.receive(TimeUnit.MINUTES.toMillis(1)); 
     if (message != null) { 
      AMQPType payload = message.getAmqpValue().getValue(); 
      System.out.println("Message payload = " + payload.toString()); 
     } else { 
      System.out.println("Did not get a message in the time given."); 
     } 

     connection.close(); 
    } 

    public static void main(String[] args) { 
     SwitftMQTest testCase = new SwitftMQTest(); 

     try { 
      testCase.run(); 
     } catch (Exception ex) { 
      System.out.println("Problem with AMQP connection: " + ex.getMessage()); 
     } 
    } 
}