2015-12-04 48 views
1

我使用boto3從S3讀取文件,這表明它比sc.textFile(...)快得多。這些文件大約在300MB到1GB之間。這個過程是這樣:PySpark在使用boto3讀取大文件時拋出java.io.EOFException

data = sc.parallelize(list_of_files, numSlices=n_partitions) \ 
    .flatMap(read_from_s3_and_split_lines) 

events = data.aggregateByKey(...) 

當運行這個過程中,我得到異常:

15/12/04 10:58:00 WARN TaskSetManager: Lost task 41.3 in stage 0.0 (TID 68, 10.83.25.233): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:203) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.EOFException 
    at java.io.DataInputStream.readInt(DataInputStream.java:392) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:139) 
    ... 15 more 

很多時候,只是一些任務崩盤和工作能夠恢復。但是,有些時候整個工作會在發生這些錯誤之後崩潰。我一直無法找到這個問題的根源,並且似乎根據我閱讀的文件數量,我應用的確切轉換次數出現和消失......讀取單個文件時它永遠不會失敗。

回答

2

我遇到過類似的問題,我的調查顯示問題是Python進程缺少可用內存。 Spark已經把所有的內存和Python進程(PySpark工作的地方)都崩潰了。

一些建議:

  1. 添加一些內存的機器,
  2. unpersist不需要RDDS,
  3. 管理內存更聰明(加上星火內存使用有某些限制)。
相關問題