2017-06-13 80 views
0

我無法找到通用方法來計算給定窗口上的總和(或任何聚合函數),以獲取DataFrame中可用列的列表。窗口上的總計(總和)列表列表

val inputDF = spark 
.sparkContext 
.parallelize(
    Seq(
     (1,2,1, 30, 100), 
     (1,2,2, 30, 100), 
     (1,2,3, 30, 100), 
     (11,21,1, 30, 100), 
     (11,21,2, 30, 100), 
     (11,21,3, 30, 100) 
    ), 
    10) 
.toDF("c1", "c2", "offset", "v1", "v2") 

input.show 
+---+---+------+---+---+ 
| c1| c2|offset| v1| v2| 
+---+---+------+---+---+ 
| 1| 2|  1| 30|100| 
| 1| 2|  2| 30|100| 
| 1| 2|  3| 30|100| 
| 11| 21|  1| 30|100| 
| 11| 21|  2| 30|100| 
| 11| 21|  3| 30|100| 
+---+---+------+---+---+ 

鑑於如上所示的數據幀,可以很容易地找到總和列的列表,類似的代碼片段如下所示 -

val groupKey = List("c1", "c2").map(x => col(x.trim)) 
    val orderByKey = List("offset").map(x => col(x.trim)) 

    val aggKey = List("v1", "v2").map(c => sum(c).alias(c.trim)) 

    import org.apache.spark.sql.expressions.Window 

    val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*) 

    val outputDF = inputDF 
    .groupBy(groupKey: _*) 
    .agg(aggKey.head, aggKey.tail: _*) 

    outputDF.show 

但我似乎無法找到一個類似的方法用於窗口規範的集合函數。到目前爲止,我只能夠通過單獨指定每個列來解決這個如下 -

val outputDF2 = inputDF 
    .withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w)) 
    .withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w)) 

如果有一種方法在列的動態列表來做到這一點匯聚我會很感激。謝謝!

+0

您是否嘗試過使用'inputDF.types.foreach'? – philantrovert

+0

謝謝。你能否詳細說明在這種情況下我可以如何使用它們。我的outputDF2應該包含輸入中的所有列以及列表中指定列的運行總和 – Yash

回答

0

我想我找到了一種比上述問題更好的方法。

/** 
    * Utility method takes a DataFrame and a List of columns to return aggregated values for the specified list of columns 
    * @param colsToAggregate Seq[String] of all columns in the input DataFrame to be aggregated 
    * @param inputDF   Input DataFrame 
    * @param f     aggregate function 'call by name' 
    * @param partitionByColSeq Seq[] of column names to partition the inputDF before applying the aggregate 
    * @param orderByColSeq  Seq[] of column names to order the inputDF before applying the aggregate 
    * @param name_prefix  String to prefix the new columns with, to avoid collisions 
    * @param name    New column names. Uses Identify function and reuses aggregated column names 
    * @return     output DataFrame 
    */ 
    def withRollingAggregateColumns(colsToAggregate: Seq[String], 
            inputDF: DataFrame, 
            f: String => Column, 
            partitionByColSeq: Seq[String], 
            orderByColSeq: Seq[String], 
            name_prefix: String, 
            name: String => String = identity) = { 

    val groupByKey = partitionByColSeq.map(x => col(x.trim)) 
    val orderByKey = orderByColSeq.map(x => col(x.trim)) 

    import org.apache.spark.sql.expressions.Window 

    val w = Window.partitionBy(groupByKey: _*).orderBy(orderByKey: _*) 

    colsToAggregate 
     .foldLeft(inputDF)(
     (df, elementInCols) => df 
      .withColumn(
      name_prefix + "_" + name(elementInCols), 
      f(elementInCols).over(w) 
     ) 
    ) 
    } 

在這種情況下,工具方法以一個數據幀作爲輸入,並追加基於所提供的函數f新列。它使用「withColumn」和「foldLeft」語法遍歷需要聚合的列的列表。爲了避免任何列名衝突,它將用戶提供的'前綴'添加到新的聚合列