我希望在計數達到100次或每5秒翻轉處理時間後使Windows完成?也就是說,當元素達到100時,觸發Windows計算,但是如果元素沒有達到100,但時間經過5秒,它也會觸發Windows計算,就像下面兩個觸發器的組合一樣:我們可以在Flink中結合兩者並計數和處理時間觸發器嗎?
.countWindow(100)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
我希望在計數達到100次或每5秒翻轉處理時間後使Windows完成?也就是說,當元素達到100時,觸發Windows計算,但是如果元素沒有達到100,但時間經過5秒,它也會觸發Windows計算,就像下面兩個觸發器的組合一樣:我們可以在Flink中結合兩者並計數和處理時間觸發器嗎?
.countWindow(100)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
有沒有超級簡單方式與當前弗林克API來做到這一點。
您的用例需要狀態(用於計數)和計時器的組合。您可以使用自定義的Trigger或使用ProcessFunction來完成此操作。
對於使用windows plus自定義觸發器的方法,查看ProcessingTimeTrigger和CountTrigger的implementations將會很有幫助,因爲您基本上想要混合兩者。
ProcessFunction是一個較低級別的構建塊,它將託管狀態與定時器結合在一起,這正是您所需要的,所以這可能更容易,尤其是如果您已經知道如何使用Flink's managed state。
順便說一句,online Flink training materials包括幻燈片和練習用於實現自定義觸發器和使用ProcessFunction。
非常感謝你對你的好心幫,我會做你的材料的一些檢查,並且將反應在這裏,如果我有一些困難。 –
我注意到你的例子http://dataartisans.github.io/flink-training/dataStream/5-intro.html與https://ci.apache.org/projects/flink/的官方例子不同flink-docs-release-1.3/dev/stream/process_function.html的函數onTimer,在官方的例子中有timestamp == result.lastModified + XXX),但是在你的例子中沒有XXX。 –
也可以問一下ctx.timestamp()是什麼意思?從哪裏來,何時分配給ctx?從官方的例子看來,ctx.timestamp()每60秒更改一次,它是一個水印嗎? –