我是scala新手,我正在尋找解決此錯誤的方法。SparkException:由於階段失敗導致作業中止:使用Spark-Graphx時出現NullPointerException
我正在處理的情景是這樣的。我有3個表:
用戶:包含ID和名稱
業務:包含ID和名稱
評論:含user.ID和業務。 ID
只有用戶進行審查,只有商家纔會收到審覈。該圖將是這樣的:
我正在尋找的是:
For each user I want to know the other users that made a review to the same business
我做這個動作來創建圖形:
val users = sqlContext.sql("Select user_id as ID from user")
val business= sqlContext.sql("Select business_id as ID from business")
users.write.mode(SaveMode.Append).saveAsTable("user_busin_db")
business.write.mode(SaveMode.Append).saveAsTable("user_busin_db")
val user_bus = sqlContext.sql("Select ID from user_busin_db")
val reviews = sqlContext.sql("Select user_id, business_id from review")
表user_bus
將用於頂點創建。
之後,我創建的圖形與GraphX
與此代碼:
def str2Long(s: String) = s.##.toLong
val vertex: RDD[(VertexId, String)] = user_bus.rdd.map(x => (str2Long(x(0).asInstanceOf[String]),(x(0).asInstanceOf[String])))
val edge:RDD[Edge[String]] = reviews.rdd.map(row => Edge(str2Long(row(0).asInstanceOf[String]), str2Long(row(1).asInstanceOf[String]), "review"))
val default = "missing"
val myGraph = Graph(vertex, edge, default)
myGraph.cache()
現在來回答我的問題,我嘗試爲eaither用戶和企業使用此代碼做了aggregateMessages:
val userAggregate: VertexRDD[(List[Long])] = myGraph.aggregateMessages[(List[Long])](triplet => {
triplet.sendToSrc((List(triplet.dstId)))
},
(a,b) => (a.union(b))
)
val businessAggregate: VertexRDD[(List[Long])] = myGraph.aggregateMessages[(List[Long])](triplet => {
triplet.sendToDst((List(triplet.srcId)))
},
(a,b) => (a.union(b))
)
然後代碼給了我錯誤。收集每個用戶什麼是做在同一業務的評論,我寫這個的其他用戶:
userAggregate.map(userAggr =>
(userAggr._1, userAggr._2.flatMap(userAggrListElem =>
userAggr._2.patch(0,businessAggregate.filter(busAggr => busAggr._1 == userAggrListElem).map(row => row._2).take(1)(0),userAggr._2.size+1))))
如果我嘗試使用.collect或.Count之間就可以了,我得到這個錯誤:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 138.0 failed 1 times, most recent failure: Lost task 1.0 in stage 138.0 (TID 2807, localhost): java.lang.NullPointerException
at org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:94)
at org.apache.spark.graphx.VertexRDD.filter(VertexRDD.scala:98)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:102)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:101)
at scala.collection.immutable.List.flatMap(List.scala:327)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:101)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:100)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1769)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:314)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)
at org.apache.spark.rdd.RDD.count(RDD.scala:1134)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:105)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:115)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:117)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:119)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:121)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:123)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:125)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:127)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:129)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:131)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:133)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:135)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:137)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:139)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:141)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:143)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:145)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:147)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw.<init>(<console>:149)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw.<init>(<console>:151)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw.<init>(<console>:153)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw.<init>(<console>:155)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$eval$.$print$lzycompute(<console>:7)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$eval$.$print(<console>:6)
Caused by: java.lang.NullPointerException
at org.apache.spark.graphx.impl.VertexRDDImpl.mapVertexPartitions(VertexRDDImpl.scala:94)
at org.apache.spark.graphx.VertexRDD.filter(VertexRDD.scala:98)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:102)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5$$anonfun$apply$1.apply(<console>:101)
at scala.collection.immutable.List.flatMap(List.scala:327)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:101)
at linea6ec9c0b0ced4184a0288c57eb3bdda585.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$5.apply(<console>:100)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1769)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:314)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
算法效果很好,如果我使用的userAggregate
一個子集,事實上,如果我用take(1)
我得到這樣的結果:
Array[(org.apache.spark.graphx.VertexId, List[Long])] = Array((-1324024017,List(-1851582020, -1799460264, -1614007919, -1573604682, ...)))
哪個是:(USER_ID,列表(user_ID的是做了檢討,相同的業務,...)
現在我認爲有一個頂點的問題,有一個不連接的頂點,它給了我NullPointer錯誤,但我無法找到它並從我的grapf中刪除。我能做些什麼來解決這個問題?
我明白了。所以沒有辦法做我想做的事情?那麼,爲什麼如果我使用'take()'它有效? – TheEnigmist