2017-08-10 37 views

回答

0

你可以試試這個。可以給你解決。

val text = senv.socketTextStream("localhost", 9999) 
val counts = text.map {(m: String) => (m.split(",")(0), 1) } 
    .keyBy(0) 
    .timeWindow(Time.seconds(10), Time.seconds(5)) 
    .sum(1) 
counts.print 
senv.execute("ProcessingTime processing example") 
+0

對於我的數據集,它是JSON格式,因此使用我們不能分裂它「」。這是我的理解。如果我錯了,請清除我的疑惑。 –

1

進行計數聚集的最有效方法是ReduceFunction。但是,reduce有輸入和輸出類型必須相同的限制。所以,你必須將窗口前輸入轉換爲Int

val socket: DataStream[(String)] = ??? 

val cnts: DataStream[Int] = socket 
    .map(_ => 1)     // convert to 1 
    .timeWindowAll(Time.seconds(5)) // group into 5 second windows 
    .reduce((x, y) => x + y)  // sum 1s to count 
+0

我試過這個,但是map()函數, map(_ => 1), 導致錯誤 「找不到隱含的org.apache.flink.api.common類型的證據參數值。 typeinfo.TypeInformation [Int]「 –

+0

您是否導入org.apache.flink.streaming.api.scala._? –