甲函數應爲多個列中的數據幀火花遷移SQL窗函數RDD爲更好的性能
def handleBias(df: DataFrame, colName: String, target: String = target) = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
df.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
}
這可以很好地被寫爲在火花SQL如上所示被執行,並且一個for循環。然而這導致了很多洗牌(spark apply function to columns in parallel)。
小例子:
val df = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
val columnsToDrop = Seq("col3TooMany")
val columnsToCode = Seq("col1", "col2")
val target = "TARGET"
val targetCounts = df.filter(df(target) === 1).groupBy(target)
.agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) {
(currentDF, colName) => handleBias(currentDF, colName)
}
result.drop(columnsToDrop: _*).show
我怎樣才能制定這個更有效使用RDD API? aggregateByKey
應該是一個好主意,但我仍然不清楚如何在這裏應用它來替代窗口函數。
(提供多一點上下文/更大的示例https://github.com/geoHeil/sparkContrastCoding)
編輯最初,我開始與Spark dynamic DAG is a lot slower and different from hard coded DAG其如下所示。好的是,每一列似乎都是獨立/並行的。缺點是連接(即使是一個300 MB的小數據集)會變得「太大」,並導致無響應的火花。
handleBiasOriginal("col1", df)
.join(handleBiasOriginal("col2", df), df.columns)
.join(handleBiasOriginal("col3TooMany", df), df.columns)
.drop(columnsToDrop: _*).show
def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = {
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*")/df.filter(df(target) === 1).count).alias("pre_" + col))
.drop(target)
val pre2_1 = df
.groupBy(col)
.agg(mean(target).alias("pre2_" + col))
df
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
.na.fill(0)
}
此圖片與火花2.1.0,從Spark dynamic DAG is a lot slower and different from hard coded DAG圖像是用2.0.2
當施加 df.cache handleBiasOriginal緩存的DAG會有點簡單(」 col1「,df)。 ...
除了窗口函數,還有什麼其他的可能性可以優化SQL嗎? 充其量,如果SQL是動態生成的,那就太好了。
請參閱http://stackoverflow.com/questions/41169873/spark-dynamic-dag-is-a-lot-slower-and-different-from-hard-coded-dag以及我的編輯以上。最初,我開始使用group-by和join。這導致一項工作沒有在合理時間內完成/晶石似乎沒有執行任何操作。雖然聯接解決方案適用於小數據,但我無法使其與許多列一起工作。期待如何改進SQL的建議。 –
我不是說連接就是解決方案。我說的是,在大多數情況下,使用aggregateByKey的RDD會比較慢。你可以繼續使用我展示的鏈接和如何實現它的基本邏輯來嘗試aggregateByKey。 –
同時,您是否看到一種不使用慢窗口函數的方法,但仍然阻止使用連接? –