2016-12-15 21 views
2

我們需要在我們的Java EE應用程序中使用隊列,並且由於它是雲基礎應用程序(部署在OpenShift Online上),我們喜歡使用亞馬遜sqs。在@MessageDriven bean中使用amazon sqs - 池/並行處理

如果我正確理解了JMS/Java EE的接收部分的理論,一個@MessageDriven bean由Java EE容器管理,以便並行創建大量bean實例(根據最大池大小),如果傳入消息的數量很高。這當然是處理高負載的一大好處。

但是,我不明白我們如何在Java EE應用程序中以這種方式集成aws sqs。我知道異步接收器的例子來自http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html

class MyListener implements MessageListener { 

    @Override 
    public void onMessage(Message message) { 
     try { 
      // Cast the received message as TextMessage and print the text to screen. 
      if (message != null) { 
       System.out.println("Received: " + ((TextMessage) message).getText()); 
      } 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

然後:

// Create a consumer for the 'TestQueue'. 
MessageConsumer consumer = session.createConsumer(queue); 

// Instantiate and set the message listener for the consumer. 
consumer.setMessageListener(new MyListener()); 

// Start receiving incoming messages. 
connection.start(); 

這是官方異步接收器的例子 - 這不是一個@MessageDriven豆。很顯然,我們需要在某個地方進行身份驗證(通過創建一個SQSConnectionFactory,然後是一個連接,然後是一個會話 - 這在示例中也有詳細描述)。
但我強烈認爲這個例子不會並行處理消息 - 即只有一個bean實例正在處理隊列,這對於可伸縮的高負載應用程序來說並不是一個好的解決方案。

a)我們如何才能通過Amazon SQS實現真正的Java EE方式? 我只是發現春天的例子。但它必須是Java EE 7. b)我們使用Wildfly(現在是8.2.1)。是否也可以讓Wildfly在內部管理與AWS和應用程序的連接,我們可以像使用應用程序服務器管理的隊列一樣使用隊列(與數據源訪問數據庫的方法相同)?

結論後,得到的回答從stdunbar
它似乎沒有可能在一個「適當的方式」,是我喜歡做的事。所以我該怎麼做?實施ManagedExecutorService作爲stdunbar描述'包裹'的隊列? - 但是這意味着有一個本地隊列,這對於一個應用程序來說不是一個好的情況,應該是可擴展的! 什麼是替代品?我們正在OpenShift Online上運行應用程序。使用例如自己的裝備來實例化自己的裝備可能是不利的。 ApacheMQ Cartridge ......當然還有很多不利因素,比如成本,我們對「基礎架構」負責。

說實話,我在這種情況下,真的很失望AWS的...

回答

2

我不認爲我的解決方案是正確的JAVA EE,但在我的情況下,它的工作原理。

配置:

@Singleton 
public class SqsMessageManager 
{ 
    private Integer numberOfReceivers = 3; 

    public static SQSConnection connection = null; 
    public static Queue queue = null; 

    @Inject 
    SqsMessageReceiver sqsMessageReceiver; 

    public void init() 
    { 
     try 
     { 
      SQSConnectionFactory connectionFactory = 
        SQSConnectionFactory.builder() 
          .withRegion(Region.getRegion(Regions.EU_WEST_1)) 
          .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider()) 
          .build(); 

      connection = connectionFactory.createConnection(); 

      queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue"); 

      for (int i = 0; i < numberOfReceivers; i++) 
       connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver); 

      connection.start(); 
     } 
     catch (JMSException e) 
     { 
      e.getStackTrace(); 
     } 
    } 
} 

然後發件人:

@Dependent 
public class SqsMessageSender 
{ 
    MessageProducer producer = null; 
    Session senderSession = null; 

    @PostConstruct 
    public void createProducer(){ 
     try 
     { 
      // open new session and message producer 
      senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      producer = senderSession.createProducer(SqsMessageManager.queue); 
     }catch(JMSException | NullPointerException e){ 
      ; 
     } 
    } 

    @PreDestroy 
    public void destroy(){ 
     try 
     { 
      // close session 
      producer.close(); 
      senderSession.close(); 
     }catch(JMSException e){ 

     } 
    } 

    // sends a message to aws sqs queue 
    public void sendMessage(String txt) 
    { 
     try 
     { 
      TextMessage textMessage = senderSession.createTextMessage(txt); 
      producer.send(textMessage); 
     } 
     catch (JMSException e) 
     { 
      e.getStackTrace(); 
     } 
    } 
} 

和接收器:

@Dependent 
public class SqsMessageReceiver implements MessageListener 
{ 
    public void onMessage(Message inMessage) { 
     ... 
    } 
} 
+0

我不完全明白,你的** numberOfReceivers **是如何工作的。你在同一個對象上創建了多個監聽器(你注入'sqsMessageReceiver',實際上是一個實例)? – badera

3

據一些老docs I found

容器允許消息驅動bean類的實例是併發運行,從而允許併發處理消息流。

通過使用亞馬遜JMS的整合,再加上聲明 MDB,你要善於去。我不會使用setMessageListener接口。您可以使用JMS的聲明版本,因爲您使用的是Wildfly 8。x/EE7:

@MessageDriven(activationConfig = { /* your config - i.e. queue name, etc */ }) 
public class MyListener implements MessageListener { 
    @Override 
    public void onMessage(Message message) { 
    } 
} 

這允許容器根據需要創建儘可能多的實例。請注意,可能需要在Wildfly中爲JMS參數進行一些調整。

請注意,讓Amazon庫負責讀取SQS隊列。我已經開始翻閱自己的讀者,認爲我可以編寫它。但是我發現,您不能使用帶有從隊列中讀取的多個線程的AWS Java庫,因爲幾乎每次都會有重複的讀取。我有4個線程讀取SQS隊列,並會得到4條相同的消息。我終於變成了一個閱讀器,將消息放入一個LinkedBlockingDeque中,以供其他一些線程使用。

我所展示的一切都是純Java/EE。

編輯
與亞馬遜SQS/JMS集成玩弄了一段,我覺得如果你使用它,你是在浪費你的時間。它僅適用於JMS 1.1,因此它也使用帶有JNDI的舊JMS語法。此外,它只適用於隊列,不適用於主題。

我強烈建議創建自己的實現。 ManagedExecutorService運行帶有短SQS讀取超時的隊列讀取器線程。每次循環都將從SQS隊列中讀取並將消息放入JMS隊列或主題中。

對不起,你已經得到了這個希望 - 亞馬遜剛剛沒有被維持足夠的價值。

+0

謝謝,stdunbar。我很高興聽到這應該起作用 - 但是,我不知道如何發揮所需的參數(憑據)。我在哪裏放置AWS證書?我該如何實現你的注意事項'作爲一個便箋,讓亞馬遜圖書館負責讀取SQS隊列? - 我認爲用@ MessageDriven註釋會使應用程序服務器讀取隊列?! - 我明白,如果您可以更具體地瞭解如何將AWS SQS集成到您的代碼中。 – badera

+0

我懷疑你是對的。這些都不是好消息。如果我理解正確,那只是SDK「不是最新的」,而不是服務本身? - 我用一個結論和一個進一步的問題更新了這個問題。 – badera

+0

標有b)的問題仍未解答。希望能得到更多的想法,該怎麼辦,我開始賞金......至少感謝[+1]! – badera

0

Payara Cloud Connectors似乎是相當新的,但看起來很有希望。不知道這是否也適用於其他容器。據我瞭解,它基於JCA適配器。