我正在處理的一個項目需要讀取來自SQS的消息,並且我決定使用Akka來分發這些消息的處理。由於SQS得到了Camel的支持,並且在Consumer類中有Akka中使用的功能,所以我想盡可能以這種方式實現端點和讀取消息,儘管我沒有看到很多例子人們這樣做。Akka,SQS和Camel的消費者投票率
我的問題是,我無法快速輪詢我的隊列,以保持隊列爲空或接近空。我最初的想法是,我可以讓消費者通過SQS以X/s的速率接收來自駱駝的消息。從那裏,我可以簡單地創造更多的消費者,以達到我需要處理消息的速度。
我的消費:
import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}
class MyConsumer() extends Consumer {
def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
var count = 0
def receive = {
case msg: CamelMessage => {
count += 1
}
case _ => {
println("Got something else")
}
}
override def postStop(){
println("Count for actor: " + count)
}
}
如圖所示,我設置delay=1
以及&maxMessagesPerPoll=10
,以提高信息的速度,但我無法產生多個消費者使用相同的終點。
我的文檔閱讀By default endpoints are assumed not to support multiple consumers.
,我相信這對於SQS終端也是如此,產卵多的消費者會給我只有一個消費者,其中系統運行一分鐘後,輸出消息是Count for actor: x
代替其他輸出Count for actor: 0
。
如果這是有用的;我能夠閱讀大約33條消息/秒與當前實施在單個消費者。
這是從Akka的SQS隊列中讀取消息的正確方法嗎?如果是這樣,我可以通過這種方式擴展到外部,這樣我可以將消息的消費速度提高到接近每秒900條消息的速度?