我正在嘗試編寫一個簡單的vanilla協作過濾應用程序,運行在Google Cloud Dataproc上。 該數據位於BigQuery中。 我已經實現了這個根據本教程:https://cloud.google.com/dataproc/docs/tutorials/bigquery-sparkmlGoogle雲Dataproc上的IllegalStateException
現在的問題是,當運行這個(稍加修改)的例子,我得到一個IllegalStateException。更具體地說,這裏是堆棧跟蹤:
17/09/25 10:55:37 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Traceback (most recent call last):
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 109, in <module>
compute_recommendations()
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 59, in compute_recommendations
conf=conf)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 646, in newAPIHadoopRDD
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, marketing-pages-collaborative-filtering-w-1.c.dg-dev-personalization.internal): java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1!
at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197)
at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327)
at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1324)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.take(RDD.scala:1298)
at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:203)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:582)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1!
at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197)
at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327)
at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
17/09/25 10:55:37 INFO org.spark_project.jetty.server.ServerConnector: Stopped [email protected]{HTTP/1.1}{0.0.0.0:4040}
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [af84ad68-0259-4ca1-b464-a118a96f0742] entered state [ERROR] while waiting for [DONE].
我想我已經確定了問題,但找不到問題的原因。相關的代碼片段是這樣的:
table_rdd = spark.sparkContext.newAPIHadoopRDD(
"com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
"org.apache.hadoop.io.LongWritable",
"com.google.gson.JsonObject",
conf=conf)
table_json = table_rdd.map(lambda x: x[1])
visit_data = sparkSession.read.json(table_json)
首先我根據Google的教程創建RDD。下一步是從RDD中提取JSON元素,然後將其讀入表中,以便查詢。 stacktrace顯示分配conf時會發生異常,但是代碼有效,直到我調用sparkSession.read.json(table_json)
,因爲據我瞭解,spark會延遲工作,然後纔會嘗試訪問從BigQuery中導出的實際JSON文件。
現在的問題是,Spark發現比應該有更多的JSON文件。 根據BigQuery Hadoop庫代碼中的comment,即使所有內容都符合一個分片,最小值爲2,這樣BigQuery就可以識別導出。另外它說那裏它生成一個所謂的結束標記文件,據我所知,它只是一個空的JSON文件。
但是,運行代碼時,由BigQuery生成的導出超過了2個必需文件(1個包含數據,1個作爲結束標記)。它最多可生成5個JSON文件,有時僅包含BigQuery中的1或2行。
我很確定這是問題,出口不知何故是錯誤的。但我無法找出發生這種情況的原因以及如何解決這個問題。任何幫助表示讚賞。
更新:
我試過別的東西。我在BigQuery中刪除了表格,並從頭開始填充它。這解決了出口的問題。現在只有兩個文件。但我認爲問題仍然存在。我會嘗試通過雲功能(這會發生在我的應用程序中)添加一些行,然後更新行爲。
更新2:
所以等待了一天,並通過使用雲功能的流媒體刀片加入一些行之後,這個問題再次發生。不知何故,出口是按天劃分的。如果每一天都有自己的分片,這不會成爲問題,但這不會發生。
你有沒有可以分享的BigQuery jobid?您也可以直接通過[email protected]聯繫Google團隊以分享您的項目ID。您的評估是正確的,在「結束標記」零長度文件之後不應該有任何文件編號。您是使用BigQuery「流媒體插入」添加行,還是使用重量級「加載」作業添加? –
@DennisHuo感謝您的回覆。我已經聯繫過谷歌團隊,但他們花了很多時間來處理這個問題。要插入,我們使用流式插入,通過雲端函數。不知何故,只有等待一天並添加額外的行纔會顯示附加文件。 –
我不知道如果針對BQ運行查詢來構建ML系統是一種好方法。在[這個項目](https://github.com/WillianFuks/PySpark-RecSys)中,我創建了你可以看到我有一些出口商在BQ中運行查詢並將結果導出到GCS,然後我在spark中讀取它們。從來沒有任何問題,它的工作速度非常快(並且它避免了總是花費錢的查詢,這種方法只運行一次)。巧合的是,它也實現了推薦系統,但使用DIMSUM算法。 –