2017-06-06 81 views
1

我有flink流,我在一些時間窗口上說幾件事說30秒。閃爍流窗口觸發器

這裏發生了什麼,它給了我結果我彙總以前的窗口以及。

說前30秒,我得到的結果10

下一個THIRY秒我要新鮮的結果,而不是我得到的最後一個窗口結果+新 等。

所以我的問題是我如何獲得每個窗口的新鮮結果。

+1

你可能會給我們一個你的代碼的想法。 – ImbaBalboa

回答

2

您需要使用purging觸發。你想要的是FIRE_AND_PURGE(發射和刪除窗口內容),默認的flink觸發器是FIRE(發射和保持窗口內容)。

input 
    .keyBy(...) 
    .timeWindow(Time.seconds(30)) 

    // The important part: Replace the default non-purging ProcessingTimeTrigger 
    .trigger(new PurgingTrigger[..., TimeWindow](ProcessingTimeTrigger)) 

    .reduce(...) 

要更深入的解釋看看爲TriggersFIRE vs FIRE_AND_PURGE

觸發器確定何時窗口(由窗口分配器形成)已準備好由窗口函數處理。每個WindowAssigner都帶有一個默認觸發器。如果默認觸發器不符合您的需求,則可以使用觸發器(...)指定自定義觸發器。

觸發器觸發時,它可以是FIRE或FIRE_AND_PURGE。儘管FIRE 保留了窗口的內容,但FIRE_AND_PURGE 刪除了其內容。默認情況下,預先實現的觸發器只是簡單地FIRE而不清除窗口狀態。