2017-09-25 87 views
1

我正在嘗試編寫一個簡單的vanilla協作過濾應用程序,運行在Google Cloud Dataproc上。 該數據位於BigQuery中。 我已經實現了這個根據本教程:https://cloud.google.com/dataproc/docs/tutorials/bigquery-sparkmlGoogle雲Dataproc上的IllegalStateException

現在的問題是,當運行這個(稍加修改)的例子,我得到一個IllegalStateException。更具體地說,這裏是堆棧跟蹤:

17/09/25 10:55:37 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 
Traceback (most recent call last): 
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 109, in <module> 
compute_recommendations() 
File "/tmp/af84ad68-0259-4ca1-b464-a118a96f0742/marketing-pages-collaborative-filtering.py", line 59, in compute_recommendations 
conf=conf) 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 646, in newAPIHadoopRDD 
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco 
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, marketing-pages-collaborative-filtering-w-1.c.dg-dev-personalization.internal): java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1! 
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177) 
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:748) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.take(RDD.scala:1298) 
    at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:203) 
    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:582) 
    at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: java.lang.IllegalStateException: Found known file 'data-000000000002.json' with index 2, which isn't less than or equal to than endFileNumber 1! 
    at com.google.cloud.hadoop.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:197) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.setEndFileMarkerFile(DynamicFileListRecordReader.java:327) 
    at com.google.cloud.hadoop.io.bigquery.DynamicFileListRecordReader.nextKeyValue(DynamicFileListRecordReader.java:177) 
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:182) 
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1324) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

17/09/25 10:55:37 INFO org.spark_project.jetty.server.ServerConnector: Stopped [email protected]{HTTP/1.1}{0.0.0.0:4040} 
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [af84ad68-0259-4ca1-b464-a118a96f0742] entered state [ERROR] while waiting for [DONE]. 

我想我已經確定了問題,但找不到問題的原因。相關的代碼片段是這樣的:

table_rdd = spark.sparkContext.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", 
    "org.apache.hadoop.io.LongWritable", 
    "com.google.gson.JsonObject", 
    conf=conf) 

table_json = table_rdd.map(lambda x: x[1]) 
visit_data = sparkSession.read.json(table_json) 

首先我根據Google的教程創建RDD。下一步是從RDD中提取JSON元素,然後將其讀入表中,以便查詢。 stacktrace顯示分配conf時會發生異常,但是代碼有效,直到我調用sparkSession.read.json(table_json),因爲據我瞭解,spark會延遲工作,然後纔會嘗試訪問從BigQuery中導出的實際JSON文件。

現在的問題是,Spark發現比應該有更多的JSON文件。 根據BigQuery Hadoop庫代碼中的comment,即使所有內容都符合一個分片,最小值爲2,這樣BigQuery就可以識別導出。另外它說那裏它生成一個所謂的結束標記文件,據我所知,它只是一個空的JSON文件。

但是,運行代碼時,由BigQuery生成的導出超過了2個必需文件(1個包含數據,1個作爲結束標記)。它最多可生成5個JSON文件,有時僅包含BigQuery中的1或2行。

我很確定這是問題,出口不知何故是錯誤的。但我無法找出發生這種情況的原因以及如何解決這個問題。任何幫助表示讚賞。

更新:

我試過別的東西。我在BigQuery中刪除了表格,並從頭開始填充它。這解決了出口的問題。現在只有兩個文件。但我認爲問題仍然存在。我會嘗試通過雲功能(這會發生在我的應用程序中)添加一些行,然後更新行爲。

更新2:

所以等待了一天,並通過使用雲功能的流媒體刀片加入一些行之後,這個問題再次發生。不知何故,出口是按天劃分的。如果每一天都有自己的分片,這不會成爲問題,但這不會發生。

+0

你有沒有可以分享的BigQuery jobid?您也可以直接通過[email protected]聯繫Google團隊以分享您的項目ID。您的評估是正確的,在「結束標記」零長度文件之後不應該有任何文件編號。您是使用BigQuery「流媒體插入」添加行,還是使用重量級「加載」作業添加? –

+0

@DennisHuo感謝您的回覆。我已經聯繫過谷歌團隊,但他們花了很多時間來處理這個問題。要插入,我們使用流式插入,通過雲端函數。不知何故,只有等待一天並添加額外的行纔會顯示附加文件。 –

+0

我不知道如果針對BQ運行查詢來構建ML系統是一種好方法。在[這個項目](https://github.com/WillianFuks/PySpark-RecSys)中,我創建了你可以看到我有一些出口商在BQ中運行查詢並將結果導出到GCS,然後我在spark中讀取它們。從來沒有任何問題,它的工作速度非常快(並且它避免了總是花費錢的查詢,這種方法只運行一次)。巧合的是,它也實現了推薦系統,但使用DIMSUM算法。 –

回答

2

這是BigQuery中的一個錯誤(它返回不包含零記錄文件的輸出文件計數統計信息)。此問題的解決方案已提交,其推出將在一週內完成。

與此同時,在配置DataProc作業時,該問題的解決方法可能是在hadoop配置中將標誌"mapred.bq.input.sharded.export.enable"(也稱作ENABLE_SHARDED_EXPORT_KEY)設置爲false。

UPDATE:
截至今天2017年10月6日的,該修補程序現已100%鋪開上的BigQuery。

+0

這是否有票據或錯誤報告?我試圖使用與PySpark上DataProc的BigQuery的連接器從表中讀取數據時,遇到此問題。 – zo7

+0

我們沒有一個公共的bug報告尚未提交,但如果你仍然看到這個錯誤可能是從一個角落的情況下,一旦表格達到某個狀態會產生同樣的錯誤。我們有一個內部錯誤跟蹤這一點,但如果您認爲這很有用,可隨時向BigQuery提交錯誤報告。與此同時,您可以使用答案中建議的方法解除封鎖。謝謝! –

相關問題