我使用星火1.6.1並遇到一個奇怪的行爲更:我運行的UDF與包含數據幀的一些重計算(物理學模擬)一些輸入數據,並建立一個結果 - 包含許多列(〜40)的數據幀。星火UDF稱爲不是每一次的記錄時,DF有太多的列
奇怪的是,我的UDF被稱爲比按我的輸入數據幀的記錄再一次在此情況下(1.6倍更多的時候),這是我不能接受的,因爲它非常昂貴的。如果我減少了列數(例如20),那麼這種行爲就會消失。
我設法寫下一個小腳本演示了這一點:
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions.udf
object Demo {
case class Result(a: Double)
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]"))
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val numRuns = sc.accumulator(0) // to count the number of udf calls
val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)})
val data = sc.parallelize((1 to 100), numSlices = 5).toDF("id")
// get results of UDF
var results = data
.withColumn("tmp", myUdf($"id"))
.withColumn("result", $"tmp.a")
// add many columns to dataframe (must depend on the UDF's result)
for (i <- 1 to 42) {
results=results.withColumn(s"col_$i",$"result")
}
// trigger action
val res = results.collect()
println(res.size) // prints 100
println(numRuns.value) // prints 160
}
}
現在,有沒有辦法解決這個不降低的列數的方法嗎?
這實際工作!我仍然等待接受答案,也許有人有一個全面的答案 –
是啊,我很好奇 - 完全沒問題,你不接受:) –