3

我目前有代碼,我通過多個.withColumn鏈重複將相同的過程應用於多個DataFrame列,並且希望創建一個函數簡化程序。Spark/Scala使用多列上的相同函數重複調用withColumn()

val newDF = oldDF 
    .withColumn("cumA", sum("A").over(Window.partitionBy("ID").orderBy("time"))) 
    .withColumn("cumB", sum("B").over(Window.partitionBy("ID").orderBy("time"))) 
    .withColumn("cumC", sum("C").over(Window.partitionBy("ID").orderBy("time"))) 
    //.withColumn(...) 

我想要麼是這樣的::

def createCumulativeColums(cols: Array[String], df: DataFrame): DataFrame = { 
    // Implement the above cumulative sums, partitioning, and ordering 
} 

或更好:

def withColumns(cols: Array[String], df: DataFrame, f: function): DataFrame = { 
    // Implement a udf/arbitrary function on all the specified columns 
} 

回答

13

你在我的情況,我在用鑰匙彙總列找到累計總和可以使用select和可變參數,包括*

import spark.implicits._ 

df.select($"*" +: Seq("A", "B", "C").map(c => 
    sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c") 
): _*) 

此:

  • 地圖列名窗口表達與Seq("A", ...).map(...)
  • 預先考慮所有預先存在的列與$"*" +: ...
  • 將組合序列與... : _*解包。

,可以概括爲:

import org.apache.spark.sql.{Column, DataFrame} 

/** 
* @param cols a sequence of columns to transform 
* @param df an input DataFrame 
* @param f a function to be applied on each col in cols 
*/ 
def withColumns(cols: Seq[String], df: DataFrame, f: String => Column) = 
    df.select($"*" +: cols.map(c => f(c)): _*) 

如果您發現withColumn語法更具可讀性,你可以使用foldLeft

Seq("A", "B", "C").foldLeft(df)((df, c) => 
    df.withColumn(s"cum$c", sum(c).over(Window.partitionBy("ID").orderBy("time"))) 
) 

可以概括爲例子:

/** 
* @param cols a sequence of columns to transform 
* @param df an input DataFrame 
* @param f a function to be applied on each col in cols 
* @param name a function mapping from input to output name. 
*/ 
def withColumns(cols: Seq[String], df: DataFrame, 
    f: String => Column, name: String => String = identity) = 
    cols.foldLeft(df)((df, c) => df.withColumn(name(c), f(c))) 
相關問題