0

我有20-25列的conf文件列表,並且必須彙總第一個Notnull值。我嘗試了通過閱讀conf文件傳遞列列表和agg expr的函數。 我能夠獲得第一個功能,但無法找到如何首先指定ignoreNull值爲true。如何設置第一個函數的ignoreNulls標誌與列和聚合函數的地圖agg?

,我試過的代碼是

def groupAndAggregate(df: DataFrame, cols: List[String] , aggregateFun: Map[String, String]): DataFrame = { 
    df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun) 
} 

val df = sc.parallelize(Seq(
    (0, null, "1"), 
    (1, "2", "2"), 
    (0, "3", "3"), 
    (0, "4", "4"), 
    (1, "5", "5"), 
    (1, "6", "6"), 
    (1, "7", "7") 
)).toDF("grp", "col1", "col2") 


//first 
groupAndAggregate(df, List("grp"), Map("col1"-> "first", "col2"-> "COUNT")).show() 

+---+-----------+-----------+ 
|grp|first(col1)|count(col2)| 
+---+-----------+-----------+ 
| 1|   2|   4| 
| 0|   |   3| 
+---+-----------+-----------+ 

我需要3,以代替空的結果。 我使用的火花2.1.0和Scala 2.11

編輯1:

如果我使用下面的函數

import org.apache.spark.sql.functions.{first,count} 
df.groupBy("grp").agg(first(df("col1"), ignoreNulls = true), count("col2")).show() 

我得到我想要的結果,我們可以通過ignoreNulls真正的第一功能在地圖

回答

1

我已經能夠通過創建列的列表並將它傳遞給AGG函數來實現這一groupBy。前面的方法有一個問題,我無法命名列,因爲agg函數沒有返回輸出DF中列的順序,我已經重命名列表本身中的列。

 import org.apache.spark.sql.functions._ 

    def groupAndAggregate(df: DataFrame): DataFrame = { 
     val list: ListBuffer[Column] = new ListBuffer[Column]() 
     try { 

      val columnFound = getAggColumns(df) // function to return a Map[String, String] 

      val agg_func = columnFound.entrySet().toList. 
      foreach(field => 
       list += first(df(columnFound.getOrDefault(field.getKey, "")),ignoreNulls = true).as(field.getKey) 
      ) 

      list += sum(df("col1")).as("watch_time") 
      list += count("*").as("frequency") 

      val groupColumns = getGroupColumns(df) // function to return a List[String] 

      val output = df.groupBy(groupColumns.head, groupColumns.tail: _*).agg(
      list.head, list.tail: _* 
     ) 

      output 

     } catch { 
      case e: Exception => { 
      e.printStackTrace()} 
      null 
     } 
     } 
0

我認爲你應該使用na運營商和drop所有的null s在您進行聚合之前。

娜:DataFrameNaFunctions返回DataFrameNaFunctions缺乏數據的工作。

drop(cols:Array [String]):DataFrame返回一個新的DataFrame,該DataFrame將刪除包含指定列中的null或NaN值的行。

然後,該代碼將如下所示:

df.na.drop("col1").groupBy(...).agg(first("col1")) 

,這將影響count所以你必須單獨做count

0

Jacek已經給出了關於如何進行的一般想法和信息。我正在給你具體的想法。

first將始終返回dataframe的第一行。檢查null(在你的情況下它看起來是空的(""))的唯一方法是將filter取出不必要的rows,然後僅使用firstaggregation

所以,亞切克已經指出,你必須做兩次手術,一個是找到了過濾數據框中和第二的first是有countaggregation。最後你可以用groupByjoin。所以,你的代碼應該如下

def groupAndAggregate(df: DataFrame, cols: List[String] , aggregateFun: Map[String, String]): DataFrame = { 
    df.groupBy(cols.head, cols.tail: _*).agg(aggregateFun) 
} 

val df = sc.parallelize(Seq(
    (0, "", "1"), 
    (1, "2", "2"), 
    (0, "3", "3"), 
    (0, "4", "4"), 
    (1, "5", "5"), 
    (1, "6", "6"), 
    (1, "7", "7") 
)).toDF("grp", "col1", "col2") 

val df2 = df.filter($"col1" =!= "") //filter all empty string containing rows 
groupAndAggregate(df2, List("grp"), Map("col1" -> "first")) 
    .join(groupAndAggregate(df, List("grp"), Map("col2"-> "COUNT")), List("grp")) 
    .show(false) 

所以,最後你應該有所需的數據幀作爲

+---+-----------+-----------+ 
|grp|first(col1)|count(col2)| 
+---+-----------+-----------+ 
|1 |2   |4   | 
|0 |3   |3   | 
+---+-----------+-----------+