2017-06-15 33 views
0

我需要應用一個聚合函數的數據流與apache火花流(沒有APACHE SPARK流傳輸SQL)。應用聚合函數與火花流scala

在我的情況下,我有一個kafka生產者發送JSON格式的消息。 格式是{'a': String, 'b': String, 'c': Integer, 'd': Double}

我需要聚合在屬性'a''b'每5秒,我必須對其他2個屬性(例如平均的,或點心,或最小值,或最大值)應用聚合函數。

我該怎麼做?

感謝

+0

你是否已經嘗試了'reduce'功能? https://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams – maasg

+0

問題是reduce函數需要2個參數並返回1.我需要具有相同的模式。換句話說,如果我的初始模式是'{'a':String,'b':String,'c':Integer,'d':Double}'結果模式(帶有AVG聚合函數)應該是'{' GROUPBYa':String,'GROUPBYb':String,'AVGc':Integer,'AVGd':Double}' –

+0

您也可以使用'transform'或'foreachRDD'並應用任意RDD函數,或者轉換爲Dataframes並使用數據幀聚合API – maasg

回答

1

爲了讓您一開始,你可以接近聚集這樣的:

import sparkSession.implicits._ 

jsonDstream.foreachRDD{jsonRDD => 
    val df = sparkSession.read.json(jsonRDD) 
    val aggr = df.groupBy($"a", $"b").agg(avg($"c")) 
    ... do something with aggr ... 
}