2017-10-10 27 views
4

使用空安全等號運算符執行左外連接導致NullPointerExceptionSpark 2.2空安全左外連接空指針異常

版本 星火2.2.0, 斯卡拉2.11.8

scala> var d1 = Seq((null, 1), ("a1", 2)).toDF("a", "b") 
scala> d1.show 
+----+---+ 
| a| b| 
+----+---+ 
|null| 1| 
| a1| 2| 
+----+---+ 

scala> var d2 = Seq(("a2", 3)).toDF("a", "b") 
scala> d2.show 
+---+---+ 
| a| b| 
+---+---+ 
| a2| 3| 
+---+---+ 

scala> d1.joinWith(d2, d1("a") <=> d2("a"), "left_outer").show 
17/10/10 09:44:39 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 8) 
java.lang.NullPointerException 

這種行爲是正常嗎?

堆棧跟蹤

java.lang.NullPointerException 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 
17/10/10 10:19:28 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NullPointerException 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 

17/10/10 10:19:28 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NullPointerException 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:108) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 

回答

-1

我可以說沒有在版本2.2.0的Spark和Scala 2.11.8用同樣的例子中發現的問題,因爲使用帶有空相同的代碼例如,當我沒有得到任何異常安全

< =>

和等於操作員

===

你能再次檢查,並添加相關的發行更多的細節?

val d1 = sc.parallelize(Seq(
      (null, 1), ("a1",2)) 
     ).toDF("a", "b") 

d1.show 


+----+---+ 
| a| b| 
+----+---+ 
|null| 1| 
| a1| 2| 
+----+---+ 


val d2 = sc.parallelize(Seq(
     ("a2",3)) 
    ).toDF("a", "b") 

d2.show 

+---+---+ 
| a| b| 
+---+---+ 
| a2| 3| 
+---+---+ 


d1.joinWith(d2, d1("a") <=> d2("a"), "left_outer").show() 

+--------+----+ 
|  _1| _2| 
+--------+----+ 
|[null,1]|null| 
| [a1,2]|null| 
+--------+----+ 

d1.joinWith(d2, d1("a") === d2("a"), "left_outer").show() 



+--------+----+ 
|  _1| _2| 
+--------+----+ 
|[null,1]|null| 
| [a1,2]|null| 
+--------+----+ 

添加其他例如:

val x = Seq((100L,null), (102L,"17179869185L"), (101L,"17179869186L"), (200L,"17179869186L"), (401L,"1L"), (500L,"1L"), (600L,"8589934593L"), (700L,"8589934593L"), (800L,"8589934593L"), (900L,"8589934594L"), (1000L,"8589934594L"), (1200L,"2L"), (1300L,"2L"), (1301L,"2L"), (1400L,"17179869187L"), (1500L,"17179869188L"), (1600L,"8589934595L")).toDF("u","x1") 

x.show() 


+----+------------+ 
| u|   x1| 
+----+------------+ 
| 100|  null| 
| 102|17179869185L| 
| 101|17179869186L| 
| 200|17179869186L| 
| 401|   1L| 
| 500|   1L| 
| 600| 8589934593L| 
| 700| 8589934593L| 
| 800| 8589934593L| 
| 900| 8589934594L| 
|1000| 8589934594L| 
|1200|   2L| 
|1300|   2L| 
|1301|   2L| 
|1400|17179869187L| 
|1500|17179869188L| 
|1600| 8589934595L| 
+----+------------+ 

val y = Seq(("17179869187L",-8589934595L), ("17179869188L",-8589934595L), ("17179869185L",-858993 
4593L)).toDF("x2","y") 


y.show() 

+------------+-----------+ 
|   x2|   y| 
+------------+-----------+ 
|17179869187L|-8589934595| 
|17179869188L|-8589934595| 
|17179869185L|-8589934593| 
+------------+-----------+ 


x.join(y,'x1 === 'x2, "left_outer").show() 

+----+------------+------------+-----------+ 
| u|   x1|   x2|   y| 
+----+------------+------------+-----------+ 
| 100|  null|  null|  null| 
| 102|17179869185L|17179869185L|-8589934593| 
| 101|17179869186L|  null|  null| 
| 200|17179869186L|  null|  null| 
| 401|   1L|  null|  null| 
| 500|   1L|  null|  null| 
| 600| 8589934593L|  null|  null| 
| 700| 8589934593L|  null|  null| 
| 800| 8589934593L|  null|  null| 
| 900| 8589934594L|  null|  null| 
|1000| 8589934594L|  null|  null| 
|1200|   2L|  null|  null| 
|1300|   2L|  null|  null| 
|1301|   2L|  null|  null| 
|1400|17179869187L|17179869187L|-8589934595| 
|1500|17179869188L|17179869188L|-8589934595| 
|1600| 8589934595L|  null|  null| 
+----+------------+------------+-----------+ 

x: org.apache.spark.sql.DataFrame = [u: bigint, x1: string] 
y: org.apache.spark.sql.DataFrame = [x2: string, y: bigint] 
Command took 1.00 second 
+1

我想你的榜樣,得到的結果相同。我給出的例子仍然給出了同樣的錯誤,或許'sc.parallelize()。toDF()'和'Seq()。toDF()'之間有區別。 – rdg