2016-10-29 47 views
6

我使用星火1.6.1並遇到一個奇怪的行爲更:我運行的UDF與包含數據幀的一些重計算(物理學模擬)一些輸入數據,並建立一個結果 - 包含許多列(〜40)的數據幀。星火UDF稱爲不是每一次的記錄時,DF有太多的列

奇怪的是,我的UDF被稱爲比按我的輸入數據幀的記錄再一次在此情況下(1.6倍更多的時候),這是我不能接受的,因爲它非常昂貴的。如果我減少了列數(例如20),那麼這種行爲就會消失。

我設法寫下一個小腳本演示了這一點:

import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.functions.udf 


object Demo { 

    case class Result(a: Double) 

    def main(args: Array[String]): Unit = { 

    val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[*]")) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    val numRuns = sc.accumulator(0) // to count the number of udf calls 

    val myUdf = udf((i:Int) => {numRuns.add(1);Result(i.toDouble)}) 

    val data = sc.parallelize((1 to 100), numSlices = 5).toDF("id") 

    // get results of UDF 
    var results = data 
     .withColumn("tmp", myUdf($"id")) 
     .withColumn("result", $"tmp.a") 


    // add many columns to dataframe (must depend on the UDF's result) 
    for (i <- 1 to 42) { 
     results=results.withColumn(s"col_$i",$"result") 
    } 

    // trigger action 
    val res = results.collect() 
    println(res.size) // prints 100 

    println(numRuns.value) // prints 160 

    } 
} 

現在,有沒有辦法解決這個不降低的列數的方法嗎?

回答

4

我真的不能解釋這種行爲 - 但顯然查詢計劃莫名其妙地選擇其中的一些記錄被計算兩次的路徑。這意味着如果我們緩存中間結果(在應用UDF之後),我們可能能夠「強制」Spark而不重新計算UDF。事實上,一旦緩存被添加它的行爲與預期 - UDF只調用100次:

// get results of UDF 
var results = data 
    .withColumn("tmp", myUdf($"id")) 
    .withColumn("result", $"tmp.a").cache() 

當然,緩存有它自己的成本(內存...),但它最終可能會在您的案件有益如果它節省了很多UDF調用。

+0

這實際工作!我仍然等待接受答案,也許有人有一個全面的答案 –

+0

是啊,我很好奇 - 完全沒問題,你不接受:) –

4

我們以前有大約一年的這個同樣的問題,花了大量的時間,直到我們終於想通了什麼問題。

我們也有一個非常昂貴的UDF計算,我們發現,它被一次又一次地計算每個我們指的是其列的時間。它只是發生在我們身上再前幾天,所以我決定開這個錯誤: SPARK-18748

我們也在這裏開了一個問題的話,但現在我看到標題不太好: Trying to turn a blob into multiple columns in Spark

我同意Tzach關於以某種方式「強制」計算UDF的計劃。我們這樣做是醜陋,但我們必須,因爲我們不能緩存()中的數據 - 這是太大了:

val df = data.withColumn("tmp", myUdf($"id")) 
val results = sqlContext.createDataFrame(df.rdd, df.schema) 
      .withColumn("result", $"tmp.a") 

更新:

現在,我看到我的JIRA票被鏈接到另一個之一:SPARK-17728,仍然沒有真正解決這個問題的正確方法,但它周圍給人一種多個可選的工作:

val results = data.withColumn("tmp", explode(array(myUdf($"id")))) 
        .withColumn("result", $"tmp.a") 
+0

感謝分享! – twoface88