2017-08-02 175 views
0

我建立了一個Spark集羣,所有節點都可以訪問網絡共享存儲,在這裏他們可以訪問要讀取的文件。我在一個python jupyter筆記本上運行它。它在幾天前工作,現在它停止工作,但我不知道爲什麼,或者我改變了什麼。Spark'FileNotFoundException:文件不存在'錯誤(python)

我已經嘗試重新啓動節點和主。

我也試着將csv文件複製到一個新的目錄並指向spark.read,但它仍然給出相同的錯誤。

當我刪除CSV文件,它提供了更短的錯誤說「找不到文件」

任何幫助將不勝感激。

這是我的代碼:

from pyspark.sql import SparkSession 
from pyspark.conf import SparkConf 

spark = SparkSession.builder \ 
    .master("spark://IP:PORT") \ 
    .appName("app_1") \ 
    .config(conf=SparkConf()) \ 
    .getOrCreate() 

df = spark.read.csv("/nas/file123.csv") 
string1 = df.rdd.map(lambda x: x.column1).collect() 

不過,我得到這個錯誤:

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-2-12bd938122cd> in <module>() 
    29 
    30 
---> 31 string1 = df.rdd.map(lambda x: x.column1).collect() 
    32 
    33 

/home/hjk/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/rdd.pyc in collect(self) 
    807   """ 
    808   with SCCallSiteSync(self.context) as css: 
--> 809    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    810   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    811 

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, self.name) 
    1134 
    1135   for temp_arg in temp_args: 

/home/hjk/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw) 
    61  def deco(*a, **kw): 
    62   try: 
---> 63    return f(*a, **kw) 
    64   except py4j.protocol.Py4JJavaError as e: 
    65    s = e.java_exception.toString() 

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 
    317     raise Py4JJavaError(
    318      "An error occurred while calling {0}{1}{2}.\n". 
--> 319      format(target_id, ".", name), value) 
    320    else: 
    321     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 3.0 failed 4 times, most recent failure: Lost task 4.3 in stage 3.0 (TID 37, executor 2): java.io.FileNotFoundException: File file:/nas/file123.csv does not exist 
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved. 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:157) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:934) 
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453) 
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: java.io.FileNotFoundException: File file:/nas/file123.csv does not exist 
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved. 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:157) 
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112) 
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504) 
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328) 
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951) 
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) 
+0

你試過提供文件:如果您使用的是NAS的位置。請嘗試。默認情況下,它將嘗試從HDFS獲取文件 –

+0

提供輸入的絕對路徑 –

回答

1

從它看起來是檢查您的本地系統上的文件中的錯誤。只要確保你的文件存在於指定的路徑上。也請嘗試以下建議。

  1. 嘗試用文件URI:文件:///nas/file123.csv
  2. 上傳HDFS上的文件,並嘗試讀取HDFS URI的文件中像HDFS:/// ...

希望這會有所幫助。

問候,

Neeraj

+1

不知何故該節點無權訪問該文件。我重新啓動該服務器,並再次正常工作。謝謝 – msharky