2017-12-03 197 views
0

我想在基於歷史事件的流中計算Flink中基於窗口的平均值(或由我定義的任何其他函數),因此流必須是事件時間(不處理基於時間):使用Flink和基於事件時間的流計算平均值

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

我已經找到了如何在攝入添加時間戳:

ctx.collectWithTimestamp(Datapoint(instrument, bid, ask), time.getMillis) 

但是,當我做計算(一個應用函數),它不工作時我只是按照我沒有使用EventTime的方式進行操作。我已經讀了一些關於我必須設置的水印:

val avg = stream 
    .keyBy("instrument") 
    .timeWindow(Time.seconds(10)) 
    .apply((key: Tuple, window: TimeWindow, values: Iterable[Datapoint], out: Collector[Datapoint])=>{ 
    val avg = values.map(_.val).sum/values.size 
    val dp = Datapoint(key.getField[String](0), avg) 
    out.collect(dp) 
    }) 

avg.print() 
env.execute() 

有人有一個簡單的Scala例子嗎?

問候,
安德烈亞斯

回答

0

水印是一種有效地與早期的時間戳的所有事件都(可能)已經抵達斷言時間戳。基於事件時間的Windows依賴水印來知道窗口何時完成。到目前爲止,最常見的水印策略是假定事件以一定的有限延遲到達。

如果要發射的數據源水印(服用時),見Source Functions with Timestamps and Watermarks,但它是那樣簡單

ctx.emitWatermark(new Watermark(datapoint.getWatermarkTime)) 

如果,另一方面,要解決這個問題之外來源,見Timestamp Assigners/Watermark GeneratorsAssigners allowing a fixed amount of lateness。你可以簡單地做這樣的事情:

stream 
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Datapoint](Time.seconds(10))(_.getTimestamp)) 
    .keyBy("instrument") 
    ... 

我鏈接到的文檔有更詳細的例子在斯卡拉。

+0

非常感謝!順便說一句:你知道Apache Flink的好書或者教程嗎? –

+0

@AndreasVogler我認爲這本書對你很有用:https://data-artisans.com/download-introduction-apache-flink-book –

+0

http://training.data-artisans.com/上的練習應該是很有幫助。 –