我想在基於歷史事件的流中計算Flink中基於窗口的平均值(或由我定義的任何其他函數),因此流必須是事件時間(不處理基於時間):使用Flink和基於事件時間的流計算平均值
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
我已經找到了如何在攝入添加時間戳:
ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis)
但是,當我做計算(一個應用函數),它不工作時我只是按照我沒有使用EventTime的方式進行操作。我已經讀了一些關於我必須設置的水印:
val avg = stream
.keyBy("instrument")
.timeWindow(Time.seconds(10))
.apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{
val avg = values.map(_.val).sum/values.size
val dp = Datapoint(key.getField[String](0), avg)
out.collect(dp)
})
avg.print()
env.execute()
有人有一個簡單的Scala例子嗎?
問候,
安德烈亞斯
非常感謝!順便說一句:你知道Apache Flink的好書或者教程嗎? –
@AndreasVogler我認爲這本書對你很有用:https://data-artisans.com/download-introduction-apache-flink-book –
http://training.data-artisans.com/上的練習應該是很有幫助。 –