0
Spark SQL對我來說很清楚。但是,我剛開始使用Spark的RDD API。作爲spark apply function to columns in parallel指出,這應該讓我獲得spark將spark-SQL轉換爲RDD API
def handleBias(df: DataFrame, colName: String, target: String = this.target) = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
df.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
}
}
擺脫緩慢的洗牌在僞代碼:df foreach column (handleBias(column)
所以最小的數據幀裝起來
val input = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
)
val inputDf = input.toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
但沒有正確映射
val rdd1_inputDf = inputDf.rdd.flatMap { x => {(0 until x.size).map(idx => (idx, x(idx)))}}
rdd1_inputDf.toDF.show
它失敗
java.lang.ClassNotFoundException: scala.Any
java.lang.ClassNotFoundException: scala.Any
對於此問題中概述的問題,可以找到一個示例https://github.com/geoHeil/sparkContrastCodinghttps://github.com/geoHeil/sparkContrastCoding/blob/master/src/main/scala/ColumnParallel.scala。
正如我想在毫升使用此.Pipeline和輸出步驟是DataFrame的「模式丟失」,例如我將需要使用模式匹配?它是否正確?但有很多列是否有一種方法來「推斷」它們(部分shcema? –
是的DF => RDD轉換不會使用架構根本不幸的是(我不認爲有這是一種強制使用它的好方法)但是,看一下我的新數據集示例:不需要使用中間數據框Dataframe,它看起來像DataSet可以很好地推斷類型(在Spark 2.0中我認爲任何你可以用DF做的事情也可以用DS來完成) –
@GeorgHeiler(不知道你是否被告知了^^^^) –