2017-07-02 96 views
0

我是scala新手,我正在尋找解決此錯誤的方法。SparkException:由於階段失敗導致作業中止:使用Spark-Graphx時出現NullPointerException


我正在處理的情景是這樣的。我有3個表:

  • 用戶:包含ID和名稱

  • 業務:包含ID和名稱

  • 評論:含user.ID和業務。 ID

只有用戶進行審查,只有商家纔會收到審覈。該圖將是這樣的:

Graph example

我正在尋找的是:

For each user I want to know the other users that made a review to the same business

我做這個動作來創建圖形:

val users = sqlContext.sql("Select user_id as ID from user") 
val business= sqlContext.sql("Select business_id as ID from business") 
users.write.mode(SaveMode.Append).saveAsTable("user_busin_db") 
business.write.mode(SaveMode.Append).saveAsTable("user_busin_db") 
val user_bus = sqlContext.sql("Select ID from user_busin_db") 
val reviews = sqlContext.sql("Select user_id, business_id from review") 

user_bus將用於頂點創建。

之後,我創建的圖形與GraphX與此代碼:

def str2Long(s: String) = s.##.toLong 

val vertex: RDD[(VertexId, String)] = user_bus.rdd.map(x => (str2Long(x(0).asInstanceOf[String]),(x(0).asInstanceOf[String]))) 
val edge:RDD[Edge[String]] = reviews.rdd.map(row => Edge(str2Long(row(0).asInstanceOf[String]), str2Long(row(1).asInstanceOf[String]), "review")) 

val default = "missing" 
val myGraph = Graph(vertex, edge, default) 

myGraph.cache() 

現在來回答我的問題,我嘗試爲eaither用戶和企業使用此代碼做了aggregateMessages:

val userAggregate: VertexRDD[(List[Long])] = myGraph.aggregateMessages[(List[Long])](triplet => { 
     triplet.sendToSrc((List(triplet.dstId))) 
    }, 
    (a,b) => (a.union(b)) 
) 

val businessAggregate: VertexRDD[(List[Long])] = myGraph.aggregateMessages[(List[Long])](triplet => { 
     triplet.sendToDst((List(triplet.srcId))) 
    }, 
    (a,b) => (a.union(b)) 
) 

然後代碼給了我錯誤。收集每個用戶什麼是做在同一業務的評論,我寫這個的其他用戶:

userAggregate.map(userAggr => 
    (userAggr._1, userAggr._2.flatMap(userAggrListElem => 
     userAggr._2.patch(0,businessAggregate.filter(busAggr => busAggr._1 == userAggrListElem).map(row => row._2).take(1)(0),userAggr._2.size+1)))) 

如果我嘗試使用.collect或.Count之間就可以了,我得到這個錯誤:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 138.0 failed 1 times, most recent failure: Lost task 1.0 in stage 138.0 (TID 2807, localhost): java.lang.NullPointerException 
at org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:94) 
    at org.apache.spark.graphx.VertexRDD.filter(VertexRDD.scala:98) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:102) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:101) 
    at scala.collection.immutable.List.flatMap(List.scala:327) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:101) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:100) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1769) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:314) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930) 
    at org.apache.spark.rdd.RDD.count(RDD.scala:1134) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:105) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:115) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:117) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:119) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:121) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:123) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:125) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:127) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:129) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:131) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:135) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:137) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:139) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:141) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:143) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:145) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:147) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw.<init>(<console>:149) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw.<init>(<console>:151) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw.<init>(<console>:153) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw.<init>(<console>:155) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$eval$.$print$lzycompute(<console>:7) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$eval$.$print(<console>:6) 
Caused by: java.lang.NullPointerException 
    at org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:94) 
    at org.apache.spark.graphx.VertexRDD.filter(VertexRDD.scala:98) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:102) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:101) 
    at scala.collection.immutable.List.flatMap(List.scala:327) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:101) 
    at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:100) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1769) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) 
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:314) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

算法效果很好,如果我使用的userAggregate一個子集,事實上,如果我用take(1)我得到這樣的結果:

Array[(org.apache.spark.graphx.VertexId, List[Long])] = Array((-1324024017,List(-1851582020, -1799460264, -1614007919, -1573604682, ...))) 

哪個是:(USER_ID,列表(user_ID的是做了檢討,相同的業務,...)

現在我認爲有一個頂點的問題,有一個不連接的頂點,它給了我NullPointer錯誤,但我無法找到它並從我的grapf中刪除。我能做些什麼來解決這個問題?

回答

0

TL; DR這不是一個有效的Spark代碼。

這是預期的結果。不允許在Apache Spark中嵌套轉換,因此您不能在userAggregate.map的結尾中訪問businessAggregate

+0

我明白了。所以沒有辦法做我想做的事情?那麼,爲什麼如果我使用'take()'它有效? – TheEnigmist

相關問題