2013-11-04 47 views
6

我正在處理的一個項目需要讀取來自SQS的消息,並且我決定使用Akka來分發這些消息的處理。由於SQS得到了Camel的支持,並且在Consumer類中有Akka中使用的功能,所以我想盡可能以這種方式實現端點和讀取消息,儘管我沒有看到很多例子人們這樣做。Akka,SQS和Camel的消費者投票率

我的問題是,我無法快速輪詢我的隊列,以保持隊列爲空或接近空。我最初的想法是,我可以讓消費者通過SQS以X/s的速率接收來自駱駝的消息。從那裏,我可以簡單地創造更多的消費者,以達到我需要處理消息的速度。

我的消費:

import akka.camel.{CamelMessage, Consumer} 
import akka.actor.{ActorRef, ActorPath} 

class MyConsumer() extends Consumer { 
    def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)" 
    var count = 0 

    def receive = { 
    case msg: CamelMessage => { 
     count += 1 
    } 
    case _ => { 
     println("Got something else") 
    } 
    } 

    override def postStop(){ 
    println("Count for actor: " + count) 
    } 
} 

如圖所示,我設置delay=1以及&maxMessagesPerPoll=10,以提高信息的速度,但我無法產生多個消費者使用相同的終點。

我的文檔閱讀By default endpoints are assumed not to support multiple consumers.,我相信這對於SQS終端也是如此,產卵多的消費者會給我只有一個消費者,其中系統運行一分鐘後,輸出消息是Count for actor: x代替其他輸出Count for actor: 0

如果這是有用的;我能夠閱讀大約33條消息/秒與當前實施在單個消費者。

這是從Akka的SQS隊列中讀取消息的正確方法嗎?如果是這樣,我可以通過這種方式擴展到外部,這樣我可以將消息的消費速度提高到接近每秒900條消息的速度?

回答

5

不幸的是,Camel目前不支持在SQS上並行消費消息。

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

爲了解決這個問題,我寫我自己的演員輪詢使用AWS-Java的SDK一批消息SQS。

def receive = { 
    case BeginPolling => { 
     // re-queue sending asynchronously 
     self ! BeginPolling 
     // traverse the response 
     val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry] 
     val messages = sqs.receiveMessage(receiveMessageRequest).getMessages 
     messages.toList.foreach { 
     node => { 
      deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle)) 
      //log.info("Node body: {}", node.getBody) 
      filterSupervisor ! node.getBody 
     } 
     } 
     if(deleteEntryList.size() > 0){ 
     val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList) 
     sqs.deleteMessageBatch(deleteMessageBatchRequest) 
     } 
    } 

    case _ => { 
     log.warning("Unknown message") 
    } 
    } 

雖然我不能肯定這是否是最好的執行,它當然可以在使請求不會不斷衝擊空隊列,但它適合我目前能查詢的需求得到改善來自同一隊列的消息。

從SQS獲得約133(訊息/秒)/演員與此。

1

駱駝2.15支持併發消費者,雖然不知道這是多麼有用,我不知道是否akka駱駝支持2.15,我不知道是否有一位消費者演員即使有多個消費者也會有所作爲。