在弗林克定義一個時間窗口,方法如下:如何算阿帕奇弗林克在給定時間處理的記錄數窗口
val lines = socket.timeWindowAll(Time.seconds(5))
我如何能夠計算在5特定窗口的記錄數秒?
在弗林克定義一個時間窗口,方法如下:如何算阿帕奇弗林克在給定時間處理的記錄數窗口
val lines = socket.timeWindowAll(Time.seconds(5))
我如何能夠計算在5特定窗口的記錄數秒?
你可以試試這個。可以給你解決。
val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
counts.print
senv.execute("ProcessingTime processing example")
進行計數聚集的最有效方法是ReduceFunction
。但是,reduce
有輸入和輸出類型必須相同的限制。所以,你必須將窗口前輸入轉換爲Int
:
val socket: DataStream[(String)] = ???
val cnts: DataStream[Int] = socket
.map(_ => 1) // convert to 1
.timeWindowAll(Time.seconds(5)) // group into 5 second windows
.reduce((x, y) => x + y) // sum 1s to count
我試過這個,但是map()函數, map(_ => 1), 導致錯誤 「找不到隱含的org.apache.flink.api.common類型的證據參數值。 typeinfo.TypeInformation [Int]「 –
您是否導入org.apache.flink.streaming.api.scala._? –
對於我的數據集,它是JSON格式,因此使用我們不能分裂它「」。這是我的理解。如果我錯了,請清除我的疑惑。 –