2016-01-07 69 views
2

我使用的pyspark外殼,並試圖用放電的文件通配符功能從S3讀取數據,但我發現了以下錯誤:火花錯誤加載文件

Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /__/.__/\_,_/_/ /_/\_\ version 1.2.0 
     /_/ 

Using Python version 2.7.6 (default, Jul 24 2015 16:07:07) 
SparkContext available as sc. 
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", 'AWS_ACCESS_KEY_ID') 
>>> sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", 'AWS_SECRET_ACCESS_KEY') 
>>> sc.textFile("s3n://myBucket/path/files-*", use_unicode=False).count() 
16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(37645) called with curMem=83944, maxMem=278019440 
16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 36.8 KB, free 265.0 MB) 
16/01/07 18:03:02 INFO MemoryStore: ensureFreeSpace(5524) called with curMem=121589, maxMem=278019440 
16/01/07 18:03:02 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 5.4 KB, free 265.0 MB) 
16/01/07 18:03:02 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on salve1:48235 (size: 5.4 KB, free: 265.1 MB) 
16/01/07 18:03:02 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 
16/01/07 18:03:02 INFO SparkContext: Created broadcast 2 from textFile at NativeMethodAccessorImpl.java:-2 
16/01/07 18:03:03 WARN RestS3Service: Response '/path' - Unexpected response code 404, expected 200 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 819, in count 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 810, in sum 
    return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) 
    File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 715, in reduce 
    vals = self.mapPartitions(func).collect() 
    File "/spark-1.2.0-bin-1.0.4/python/pyspark/rdd.py", line 676, in collect 
    bytesInJava = self._jrdd.collect().iterator() 
    File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/spark-1.2.0-bin-1.0.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o65.collect. 
: org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler 
     at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:197) 
     at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166) 
     at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) 
     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) 
     at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source) 
     at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375) 
     at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842) 
     at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902) 
     at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032) 
     at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987) 
     at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177) 
     at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) 
     at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) 
     at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) 
     at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:57) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1352) 
     at org.apache.spark.rdd.RDD.collect(RDD.scala:780) 
     at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:309) 
     at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:32) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
     at py4j.Gateway.invoke(Gateway.java:259) 
     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
     at py4j.commands.CallCommand.execute(CallCommand.java:79) 
     at py4j.GatewayConnection.run(GatewayConnection.java:207) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: org.jets3t.service.S3ServiceException: Failed to sanitize XML document destined for handler class org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler 
     at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:179) 
     at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198) 
     at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090) 
     at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056) 
     at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328) 
     at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181) 
     ... 44 more 
Caused by: java.lang.OutOfMemoryError: Java heap space 
     at java.util.Arrays.copyOf(Arrays.java:3332) 
     at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137) 
     at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121) 
     at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:569) 
     at java.lang.StringBuffer.append(StringBuffer.java:369) 
     at org.jets3t.service.impl.rest.XmlResponsesSaxParser.sanitizeXmlDocument(XmlResponsesSaxParser.java:160) 
     at org.jets3t.service.impl.rest.XmlResponsesSaxParser.parseListBucketObjectsResponse(XmlResponsesSaxParser.java:198) 
     at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsInternal(RestS3Service.java:1090) 
     at org.jets3t.service.impl.rest.httpclient.RestS3Service.listObjectsChunkedImpl(RestS3Service.java:1056) 
     at org.jets3t.service.S3Service.listObjectsChunked(S3Service.java:1328) 
     at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:181) 
     at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:166) 
     at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) 
     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) 
     at org.apache.hadoop.fs.s3native.$Proxy7.list(Unknown Source) 
     at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:375) 
     at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:842) 
     at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:902) 
     at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1032) 
     at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987) 
     at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177) 
     at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) 
     at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:201) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:203) 
     at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) 

當我嘗試加載單個文件(不使用通配符)代碼工作。由於我需要閱讀大約100k個文件,我想知道將所有文件加載到RDD的最佳方式是什麼。


更新

的問題是,我使用的關鍵前綴有一個包含所有我的檔案S3「目錄」在300K的文件在我看來。我的文件有日期作爲後綴。

s3://myBucket/path/files-2016-01-01-02-00 
s3://myBucket/path/files-2016-01-01-02-01 
s3://myBucket/path/files-2016-01-01-03-00 
s3://myBucket/path/files-2016-01-01-03-01 

我試圖使用通配符只能選擇按日期的文件與s3n://myBucket/path/files-2016-01-01-03-*當我打開調試日誌記錄,我看到火花是列出所有在S3「目錄」的文件(s3://myBucket/path/),而不是隻有具有我指定的密鑰前綴的文件(s3://myBucket/path/files-2016-01-01-03-)。所以即使我只是想讀取2個文件,所有300k文件都被列出了,這可能是導致內存不足的原因。

回答

2

我從S3上市我的文件直接,然後由含有確切的文件名的RDD,它的工作到目前爲止我。

raw_file_list = subprocess.Popen("env AWS_ACCESS_KEY_ID="myId" AWS_SECRET_ACCESS_KEY="myKey" aws s3 ls s3://myBucket/path/files-2016-01-01-02", shell=True, stdout=subprocess.PIPE).stdout.read().strip().split('\n') 
s3_file_list = sc.parallelize(raw_file_list).map(lambda line: "s3n://myBucket/path/%s" % line.split()[3]).collect() 
rdd = sc.textFile(','.join(s3_file_list), use_unicode=False) 
1

這是拋出內存不足的問題。因此,請嘗試先將模式限制爲較少的文件,然後查看是否可以解決問題。

+0

我試着用匹配只有兩個文件的模式,它仍然給出了相同的錯誤。此外,我試圖加載的兩個文件的總大小爲150kb,不應導致內存不足。 –

0

星火有一個愚蠢的問題裝載大量小文件放在一起,因爲它爲廣播每一個的一些數據。這可能固定在幾天前發佈的1.6.0。目前我希望你的代碼加載每個文件並將RDD合併在一起。

我使用的解決方案是將所有文件移到S3上的單個目錄中,然後將其作爲glob傳遞,例如:s3n://myBucket/path/input-files/*。這樣,就spark而言,您只加載單個路徑,並且不會爲該路徑中的每個文件創建廣播變量。