S3上有一些使用快速壓縮算法(使用node-snappy
包)壓縮的csv文件。我喜歡使用com.databricks.spark.csv
來處理這些文件,但我總是得到一個無效的文件輸入錯誤。Spark com.databricks.spark.csv無法使用node-snappy加載活潑的壓縮文件
代碼:
file_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', codec='snappy', mode='FAILFAST').load('s3://sample.csv.snappy')
錯誤消息:
16/09/24 21:57:25 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-0-32-5.ec2.internal): java.lang.InternalError: Could not decompress data. Input is invalid. at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompressBytesDirect(Native Method) at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompress(SnappyDecompressor.java:239) at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:88) at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) at org.apache.hadoop.mapred.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:208) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:255) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:209) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1305) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1305) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
如果你使用的是spark 2.0,試試這個類的csv,像這樣:'df = spark.read.csv('csv.file')' –
spark.read.csv使用'com.databricks.spark.csv '同樣的問題仍然存在。 – farazZ