2017-05-04 77 views
0

我不知道在Apache Storm中處理以下問題的最佳做法是什麼。風暴:不同大小的幾個滑動窗口的最小/最大聚合

我有一個噴嘴,生成一個帶有明確時間戳的整數值流。我們的目標是用三個滑動窗口在這個流以最小/最大聚集:

  • 最後一小時
  • 最後一天,即過去24小時

最後時刻很簡單:

topology.setBolt("1h", ...) 
    .shuffleGrouping("spout") 
    .withWindow(Duration.hours(1), Duration.seconds(10)) 
    .withTimestampField("timestamp")); 

但是,對於較長的時間段,我擔心窗口的隊列大小。當我像最後一小時聚合一樣直接從噴口中獲取元組時,每個元組都會在隊列中結束。

一種可能性是從預先聚合的「1h」螺栓中消耗元組。但是,因爲我使用明確的時間戳,所以從「1h」螺栓到達的延遲元組被忽略。 1小時的延遲不是一個選項,因爲這會延遲對窗口的評估。有沒有辦法在不影響結果及時性的情況下「延遲」元組?

當然,我也可以每小時存儲一個聚合數據,然後計算最近24小時的最小值,包括來自「1h」流的最新值。但我很好奇,如果有一種方法可以正確使用Storm手段來做到這一點。

更新1

由於arunmahadevan的答案,我改變了1H分鐘螺栓發出的所有元組在各個1H窗口中的最大時間戳最小的元組。這樣消費螺栓不會因爲遲到而丟棄元組。我還介紹了一個新字段original-timestamp以保留最小元組的原始時間戳。

更新2

我最後通過僅發射在1H分鐘螺栓的狀態變化中發現的甚至更好的方法。只要沒有接收到新的元組,Storm就不會在消費螺栓中增加時間,因此阻止了遲到問題。另外,我可以保留原始時間戳而不將其複製到單獨的字段中。

回答

0

我認爲週期性地發出min從「1h」到「24h」螺栓應該工作並且保持「24h」隊列大小被檢查。

如果配置滯後,則只有在該滯後後(即事件時間超過滑動間隔+滯後時)才調用螺栓的執行。

可以說,如果「1h」螺栓配置滯後1分鐘,則只有在事件時間超過02:01後纔會在01:00 - 02:00之間爲元組調用執行。 (即螺栓已經看到時間戳≥2:01的事件)。然而,執行只會在01:00和02:00之間收到元組。

現在,如果計算最後一小時的最小值,並將結果發送到滑動間隔爲1小時和滯後= 0的「24小時」螺栓,它將觸發一旦進入事件的時間戳穿過下一個小時。如果您在02:00發出01:00-02:00分鐘,一旦收到分鐘事件,「24h」窗口將觸發(對於前一天02:00至02:00之間的事件)因爲事件時間超過了下一個小時,並且配置的延遲爲0。

+0

確定您真正幫助我的答案中至關重要的部分是發出最低限度的最小時間戳窗口。 –

+0

您是否對如何實現平均聚合相同的想法?這是不同的,因爲我不能在1h螺栓上使用滑動窗口,因爲這會導致相同的元組被平均多次合併。有沒有辦法將基於翻轉窗口的預聚合與來自源噴口的最新元素結合起來?一旦它們被來自翻滾窗口的聚合物覆蓋,我就需要從隊列中驅逐源噴口元素。但在風暴中似乎沒有可能實施自定義驅逐政策...... –