2017-08-17 30 views
0

我新來斯卡拉,我想執行以下代碼:org.apache.spark.SparkException:未能執行用戶定義的函數

val SetID = udf{(c:String, d: String) => 
    if(c.UpperCase.contains("EXKLUS") == true) 
    {d} 
    else {""} 
} 
val ParquetWithID = STG1 
    .withColumn("ID", SetID(col("line_item"), col("line_item_ID"))) 

兩列(line_itemline_item_id)被定義爲StringsSTG1架構。

我收到以下錯誤,當我嘗試運行代碼:

`org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1$$anonfun$2: (string, string) => string) 
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
at org.apache.spark.scheduler.Task.run(Task.scala:86) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 

Caused by: java.lang.NullPointerException 
    at MyTests$$anonfun$1$$anonfun$2.apply(MyTests.scala:356) 
    at MyTests$$anonfun$1$$anonfun$2.apply(MyTests.scala:355) 
    ... 16 more 

我也試過c.UpperCase().contains("EXKLUS"),但我得到了同樣的錯誤。 但是,如果我只是運行「if equals」聲明一切正常。所以我想這個問題是在我的udf中使用UpperCase().contains(" ")函數,但我不明白問題來自哪裏。任何幫助將appriciated!

回答

0

如果schema包含作爲

|-- line_item: string (nullable = true) 
|-- line_item_ID: string (nullable = true) 

然後在您的null檢查if語句就可以解決這個問題,因爲(注意,字符串toUpperCase法)

val SetID = udf{(c:String, d: String) => 
    if(c != null && c.toUpperCase.contains("EXKLUS") == true) 
    {d} 
    else {""} 
} 
val ParquetWithID = STG1 
    .withColumn("ID", SetID(col("line_item"), col("line_item_ID"))) 

我希望答案有幫助

+0

這工作!謝謝! – Inna

+0

很高興聽到@Inna和感謝接受:) –