2017-01-02 27 views
1

Spark將並行處理數據,但不處理操作。在我的DAG中,我想調用每列的函數,如 Spark processing columns in parallel,每列的值可以獨立於其他列計算。有什麼辦法通過spark-SQL API實現這種並行性?利用窗口函數Spark dynamic DAG is a lot slower and different from hard coded DAG有助於優化DAG,但只能以串行方式執行。spark並行應用函數列

其中包含多一點點信息的例子可以在下面https://github.com/geoHeil/sparkContrastCoding

最低例子發現:

val df = Seq(
    (0, "A", "B", "C", "D"), 
    (1, "A", "B", "C", "D"), 
    (0, "d", "a", "jkl", "d"), 
    (0, "d", "g", "C", "D"), 
    (1, "A", "d", "t", "k"), 
    (1, "d", "c", "C", "D"), 
    (1, "c", "B", "C", "D") 
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 

val inputToDrop = Seq("col3TooMany") 
val inputToBias = Seq("col1", "col2") 

val targetCounts = df.filter(df("TARGET") === 1).groupBy("TARGET").agg(count("TARGET").as("cnt_foo_eq_1")) 
val newDF = df.toDF.join(broadcast(targetCounts), Seq("TARGET"), "left") 
    newDF.cache 
def handleBias(df: DataFrame, colName: String, target: String = target) = { 
    val w1 = Window.partitionBy(colName) 
    val w2 = Window.partitionBy(colName, target) 

    df.withColumn("cnt_group", count("*").over(w2)) 
     .withColumn("pre2_" + colName, mean(target).over(w1)) 
     .withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D))) 
     .drop("cnt_group") 
    } 

val joinUDF = udf((newColumn: String, newValue: String, codingVariant: Int, results: Map[String, Map[String, Seq[Double]]]) => { 
    results.get(newColumn) match { 
     case Some(tt) => { 
     val nestedArray = tt.getOrElse(newValue, Seq(0.0)) 
     if (codingVariant == 0) { 
      nestedArray.head 
     } else { 
      nestedArray.last 
     } 
     } 
     case None => throw new Exception("Column not contained in initial data frame") 
    } 
    }) 

現在,我想我的handleBias功能適用於所有列,遺憾的是,這是不是並行執行。爲每列

val res = (inputToDrop ++ inputToBias).toSet.foldLeft(newDF) { 
    (currentDF, colName) => 
     { 
     logger.info("using col " + colName) 
     handleBias(currentDF, colName) 
     } 
    } 
    .drop("cnt_foo_eq_1") 

val combined = ((inputToDrop ++ inputToBias).toSet).foldLeft(res) { 
    (currentDF, colName) => 
     { 
     currentDF 
      .withColumn("combined_" + colName, map(col(colName), array(col("pre_" + colName), col("pre2_" + colName)))) 
     } 
    } 

val columnsToUse = combined 
    .select(combined.columns 
     .filter(_.startsWith("combined_")) 
     map (combined(_)): _*) 

val newNames = columnsToUse.columns.map(_.split("combined_").last) 
val renamed = columnsToUse.toDF(newNames: _*) 

val cols = renamed.columns 
val localData = renamed.collect 

val columnsMap = cols.map { colName => 
    colName -> localData.flatMap(_.getAs[Map[String, Seq[Double]]](colName)).toMap 
}.toMap 

回答

2

值可以獨立於其他列

誠然它並沒有真正幫助您的情況來計算。您可以生成多個獨立的DataFrames,每個都有自己的附加內容,但這並不意味着您可以自動將其組合爲一個執行計劃。

handleBias的每個應用程序將洗牌兩次,輸出DataFrames與父DataFrame的數據分配不同。這就是爲什麼當你在列的列表上添加fold時,每個添加必須單獨執行。

理論上你可以設計一個管道可以表示(與)是這樣的:

  • 添加唯一ID:

    df_with_id = df.withColumn("id", unique_id()) 
    
  • 計算每個DF獨立和轉換到格式:

    dfs = for (c in columns) 
        yield handle_bias(df, c).withColumn(
        "pres", explode([(pre_name, pre_value), (pre2_name, pre2_value)]) 
    ) 
    
  • UNION ALL部分結果:

    combined = dfs.reduce(union) 
    
  • 支點轉換從長到寬幅:

    combined.groupBy("id").pivot("pres._1").agg(first("pres._2")) 
    

,但我懷疑這是值得大驚小怪。您使用的過程非常繁重,需要大量的網絡和磁盤IO。

如果總水平(sum count(distinct x)) for x in columns))的數量是比較低的,你可以嘗試計算使用例如aggregateByKeyMap[Tuple2[_, _], StatCounter]否則考慮下采樣,您可以在本地計算統計水平的單通道所有統計信息。