2017-01-03 23 views
4

甲函數應爲多個列中的數據幀火花遷移SQL窗函數RDD爲更好的性能

def handleBias(df: DataFrame, colName: String, target: String = target) = { 
    val w1 = Window.partitionBy(colName) 
    val w2 = Window.partitionBy(colName, target) 

    df.withColumn("cnt_group", count("*").over(w2)) 
     .withColumn("pre2_" + colName, mean(target).over(w1)) 
     .withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D))) 
     .drop("cnt_group") 
    } 

這可以很好地被寫爲在火花SQL如上所示被執行,並且一個for循環。然而這導致了很多洗牌(spark apply function to columns in parallel)。

小例子:

val df = Seq(
    (0, "A", "B", "C", "D"), 
    (1, "A", "B", "C", "D"), 
    (0, "d", "a", "jkl", "d"), 
    (0, "d", "g", "C", "D"), 
    (1, "A", "d", "t", "k"), 
    (1, "d", "c", "C", "D"), 
    (1, "c", "B", "C", "D") 
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 

    val columnsToDrop = Seq("col3TooMany") 
    val columnsToCode = Seq("col1", "col2") 
    val target = "TARGET" 

    val targetCounts = df.filter(df(target) === 1).groupBy(target) 
    .agg(count(target).as("cnt_foo_eq_1")) 
    val newDF = df.join(broadcast(targetCounts), Seq(target), "left") 

    val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) { 
    (currentDF, colName) => handleBias(currentDF, colName) 
    } 

    result.drop(columnsToDrop: _*).show 

我怎樣才能制定這個更有效使用RDD API? aggregateByKey應該是一個好主意,但我仍然不清楚如何在這裏應用它來替代窗口函數。

(提供多一點上下文/更大的示例https://github.com/geoHeil/sparkContrastCoding

編輯

最初,我開始與Spark dynamic DAG is a lot slower and different from hard coded DAG其如下所示。好的是,每一列似乎都是獨立/並行的。缺點是連接(即使是一個300 MB的小數據集)會變得「太大」,並導致無響應的火花。

handleBiasOriginal("col1", df) 
    .join(handleBiasOriginal("col2", df), df.columns) 
    .join(handleBiasOriginal("col3TooMany", df), df.columns) 
    .drop(columnsToDrop: _*).show 

    def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = { 
    val pre1_1 = df 
     .filter(df(target) === 1) 
     .groupBy(col, target) 
     .agg((count("*")/df.filter(df(target) === 1).count).alias("pre_" + col)) 
     .drop(target) 

    val pre2_1 = df 
     .groupBy(col) 
     .agg(mean(target).alias("pre2_" + col)) 

    df 
     .join(pre1_1, Seq(col), "left") 
     .join(pre2_1, Seq(col), "left") 
     .na.fill(0) 
    } 

此圖片與火花2.1.0,從Spark dynamic DAG is a lot slower and different from hard coded DAG圖像是用2.0.2 toocomplexDAG

當施加 df.cache handleBiasOriginal緩存的DAG會有點簡單(」 col1「,df)。 ...

除了窗口函數,還有什麼其他的可能性可以優化SQL嗎? 充其量,如果SQL是動態生成的,那就太好了。

caching

回答

1

這裏的要點是避免不必要的洗牌。現在,您的代碼將爲您要包含的每個列進行兩次洗牌,並且結果數據佈局不能在列之間重複使用。

爲簡單起見,我假設target始終是二進制({0,1}),並且您使用的所有其餘列都是StringType。此外,我假設列的基數足夠低,以便將結果分組並在本地處理。您可以調整這些方法來處理其他情況,但它需要更多的工作。

RDD API

  • 從廣角到長

    重塑數據:

    import org.apache.spark.sql.functions._ 
    
    val exploded = explode(array(
        (columnsToDrop ++ columnsToCode).map(c => 
        struct(lit(c).alias("k"), col(c).alias("v"))): _* 
    )).alias("level") 
    
    val long = df.select(exploded, $"TARGET") 
    
  • aggregateByKey,重塑和收集:

    import org.apache.spark.util.StatCounter 
    
    val lookup = long.as[((String, String), Int)].rdd 
        // You can use prefix partitioner (one that depends only on _._1) 
        // to avoid reshuffling for groupByKey 
        .aggregateByKey(StatCounter())(_ merge _, _ merge _) 
        .map { case ((c, v), s) => (c, (v, s)) } 
        .groupByKey 
        .mapValues(_.toMap) 
        .collectAsMap 
    
  • 您可以使用lookupŧ o獲得單個列和級別的統計信息。例如:

    lookup("col1")("A") 
    
    org.apache.spark.util.StatCounter = 
        (count: 3, mean: 0.666667, stdev: 0.471405, max: 1.000000, min: 0.000000) 
    

    爲您提供col1數據,A水平。基於二進制TARGET的假設,這個信息是完整的(你得到兩個類的計數/分數)。

    您可以使用查找這樣生成SQL表達式或將它傳遞給udf並將其應用在各個列。

數據幀API

  • 轉換數據,只要對RDD API。基於層次
  • 計算聚集:

    val stats = long 
        .groupBy($"level.k", $"level.v") 
        .agg(mean($"TARGET"), sum($"TARGET")) 
    
  • 根據自己的喜好,你可以重塑這個能夠高效率地加入或轉換到本地收集和類似於RDD解決方案。

0

使用aggregateByKey 上aggregateByKey一個簡單的解釋可以發現here。基本上你使用兩個函數:一個在分區內工作,另一個在分區之間工作。

您需要在第一列執行類似聚合的操作,並在內部使用映射爲第二列的每個元素構建一個數據結構,以在其中彙總和收集數據(當然,如果需要,可以執行兩個aggregateByKey) 。 這不會解決你想要使用的每一列的代碼上執行多次運行的情況(你可以使用聚合而不是aggregateByKey來處理所有數據並將其放入地圖中,但這可能會使你甚至性能較差)。結果將是每個鍵一行,如果你想回到原始記錄(如窗口函數那樣),你實際上需要將該值與原始的RDD結合或者在內部保存所有值並且平坦地圖

我不相信這會爲你提供任何真正的性能改進。你會做很多工作來重新實現在SQL中爲你完成的任務,同時這樣做會失去SQL的大部分優點(催化劑優化,鎢內存管理,全部代碼生成等等)。)

改進SQL

我會做的反而是試圖改善SQL本身。 例如,窗口函數中列的結果對於所有值看起來都是相同的。你真的需要一個窗口功能嗎?你可以做一個groupBy而不是一個窗口函數(如果你真的需要這個記錄,你可以嘗試加入結果。這可能會提供更好的性能,因爲它不一定意味着在每一步中都洗牌兩次)。

+0

請參閱http://stackoverflow.com/questions/41169873/spark-dynamic-dag-is-a-lot-slower-and-different-from-hard-coded-dag以及我的編輯以上。最初,我開始使用group-by和join。這導致一項工作沒有在合理時間內完成/晶石似乎沒有執行任何操作。雖然聯接解決方案適用於小數據,但我無法使其與許多列一起工作。期待如何改進SQL的建議。 –

+0

我不是說連接就是解決方案。我說的是,在大多數情況下,使用aggregateByKey的RDD會比較慢。你可以繼續使用我展示的鏈接和如何實現它的基本邏輯來嘗試aggregateByKey。 –

+0

同時,您是否看到一種不使用慢窗口函數的方法,但仍然阻止使用連接? –