2013-05-16 59 views
5

在我的Storm拓撲結構中,在處理流時,我想延遲處理一些消息,直到某些未來的時間點。這樣做有什麼合理的選擇?Storm中的延遲隊列/消息處理

到目前爲止,我曾想過以下幾點:

  • 使用Java的Thread.sleep。 (然而,根據一些討論,這不是有效地利用雨水資源,建議這樣做。)
  • 使用延遲隊列...
  • 風暴是否有一些API用於延遲我忽略的消息?
  • 是否ZeroMQ提供延遲消息傳遞API,Storm(如果已修改)可以利用?
+0

你能告訴你爲什麼要這樣做嗎?如果你還沒有準備好處理這些東西,你爲什麼要把它傳遞給風暴拓撲? –

+2

我的第一個迴應:爲什麼問爲什麼?瞭解或回答問題很重要嗎?爲什麼延遲(或重新調度)元組有很多原因。我的元組處理並不全是關於數據的純功能轉換。就我而言,處理元組涉及捕獲系統外部的某些事情的狀態並與其他流集成。由於它隨着時間的推移而變化,我希望以受控的時間間隔捕捉該狀態。一個這樣的要求是不經常消耗外部資源。 –

回答

2

使用外部消息隊列來實現時間延遲隊列。

由於風暴是容錯和水平分佈,這將是有意義的選擇適合這種風格,比如消息隊列:

  • 卡夫卡
  • 亞馬遜SQS
  • 的RabbitMQ
5

我們正在使用拓撲滴答元組來批量處理掛起的元組。它基本上只是將它們存儲在每個正常元組的內存中,當它接收到一個滴答元組時,它會使用批量/流水線處理將它們處理到存儲/索引中。

如果卷尖峯檢測到所有元組都重定向到每個主機上的本地Redis存儲,然後在卷停止後被推回到拓撲處理中,我們還會在卷的巨量峯值的情況下使用redis。我們的情況可能不適用於你的,只是我的2c。