我有20-25列的conf文件列表,並且必須彙總第一個Notnull值。我嘗試了通過閱讀conf文件傳遞列列表和agg expr的函數。 我能夠獲得第一個功能,但無法找到如何首先指定ignoreNull值爲true。如何設置第一個函數的ignoreNulls標誌與列和聚合函數的地圖agg?
,我試過的代碼是
def groupAndAggregate(df: DataFrame, cols: List[String] , aggregateFun: Map[String, String]): DataFrame = {
df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun)
}
val df = sc.parallelize(Seq(
(0, null, "1"),
(1, "2", "2"),
(0, "3", "3"),
(0, "4", "4"),
(1, "5", "5"),
(1, "6", "6"),
(1, "7", "7")
)).toDF("grp", "col1", "col2")
//first
groupAndAggregate(df, List("grp"), Map("col1"-> "first", "col2"-> "COUNT")).show()
+---+-----------+-----------+
|grp|first(col1)|count(col2)|
+---+-----------+-----------+
| 1| 2| 4|
| 0| | 3|
+---+-----------+-----------+
我需要3,以代替空的結果。 我使用的火花2.1.0和Scala 2.11
編輯1:
如果我使用下面的函數
import org.apache.spark.sql.functions.{first,count}
df.groupBy("grp").agg(first(df("col1"), ignoreNulls = true), count("col2")).show()
我得到我想要的結果,我們可以通過ignoreNulls真正的第一功能在地圖