2017-02-01 57 views
2

每天我都會有一個CRON任務運行,它會爲SQS隊列填充許多需要實現的任務。因此(例如)每天早上9點,空隊列將收到約100條需要處理的消息。限制隊列工作人員的速率(例如:SQS)

我希望一個新的工作人員每秒鐘都快速啓動,直到隊列爲空。如果任何任務失敗,則將其放在隊列的後面以重新運行。

例如,如果每個任務佔用1.5秒來完成:

  • 1秒後,1名工人將開始消息A
  • 2秒後,1名工人可能仍在運行消息A和1名工人將開始100秒運行後消息B
  • ,1名工人可能仍在運行的消息XX和1名工人將拿起消息B,因爲它沒有以前
  • 101秒後,沒有更多的工人被傳播,直到第二天早上

有沒有辦法在AWS lambda中配置這種類型的基礎架構?

+0

這是一個有趣的用例。你能否告訴我們爲什麼需要1秒的速率限制(即使在廣義上)?它可以通過Executor完成,該Executor每秒生成1個線程並處理恰好1個SQS輪詢,然後在不爲空的情況下處理失敗的隊列 - 但我仍然對它希望的位置感到好奇。謝謝! –

+0

我們正在使用它來與第三方API進行通信,該API會限制我們使用他們的服務,每秒最多請求1個請求。 – bashaus

回答

1

對我來說,你會更好地發佈消息給SNS,而不是SQS,然後讓你的lambda函數訂閱SNS主題。

讓Lambda擔心需要爲響應負載而旋轉多少個「實例」。

以下是關於此方法的一篇博文,但Google可能會幫助您找到更接近您的實際使用案例的博文。

https://aws.amazon.com/blogs/mobile/invoking-aws-lambda-functions-via-amazon-sns/

+0

現在調用對我來說不是問題,這是限制速度的問題。有關這方面的任何建議? – bashaus

+0

你能解釋爲什麼你需要限制速度嗎?答案可能會導致更好的答案。 –

+0

基本上希望能夠發送消息到具有不合理速率限制的API – bashaus

1

的一種方式,但我不相信這是最佳的:

多數民衆贊成由CloudWatch的觸發事件(比如每秒或每10秒,這取決於你的速度極限)一個lambda。其中調查SQS接收(至多)N條消息,然後用每條消息「扇出」到另一個Lambda函數。


一些僞代碼:

# Lambda 1 (schedule by CloudWatch Event/e.g. CRON) 
def handle_cron(event, context): 
    # in order to get more messages, we might have to receive several times (loop) 
    for message in queue.receive_messages(MaxNumberOfMessages=10): 
     # Note: the Event InvocationType so we don't want to wait for the response! 
     lambda_client.invoke(FunctionName="foo", Payload=message.body, InvocationType='Event') 

# Lambda 2 (triggered only by the invoke in Lambda 1) 
def handle_message(event, context): 
    # handle message 
    pass 
+0

+1。我其實很喜歡這種設計。當您達到API限制時,比調整每封郵件的可見性好得多。 – sfratini

0

爲什麼不只是有,在上午9時開始投票SQS lambda函數,得到一個消息的時間和睡覺了每條消息之間的第二位死信隊列可以處理重試。在x秒後沒有收到來自SQS的消息後停止執行。

這是一個獨特的情況,你實際上並不需要並行處理。

+0

我想你也可以將N個Cloudwatch事件全部用同一個CRON限制到N秒,當你說「停止執行」時,你的意思是關閉事件嗎? –

+0

我的意思是有一個調查SQS隊列的lambda函數。收到消息後,它應該處理它,然後休息一會兒。然後輪詢另一個消息。如果輪詢消息並且在x秒內沒有收到任何消息,則停止執行lambda函數,直到下一個cron。 – Mark