2016-10-27 60 views
3

我有一個S3存儲桶,裏面裝滿了沒有文件擴展名的Gz文件。例如s3://mybucket/1234502827-34231Spark - 讀取沒有文件擴展名的壓縮文件

sc.textFile使用該文件擴展名來選擇解碼器。我發現了很多關於處理自定義文件擴展名的博客文章,但沒有提及缺少文件擴展名

我認爲解決方案可能是sc.binaryFiles並手動解壓文件。

另一種可能性是找出sc.textFile如何找到文件格式。我不清楚這些classOf[]調用的工作。

def textFile(
     path: String, 
     minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { 
    assertNotStopped() 
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 
     minPartitions).map(pair => pair._2.toString).setName(path) 
    } 
+0

'sc.textFile'不確定格式。它由'TextInputFormat'完成,只使用擴展名。 –

+0

或重新命名s3中的所有文件,添加'.gz'。我看了一下源代碼,它在這裏實現:https://hadoop.apache.org/docs/stable/api/src-html/org/apache/hadoop/io/compress/CompressionCodecFactory#line.191它確實使用文件擴展名。該規範建議你可以只看第一個字節http://www.zlib.org/rfc-gzip.html#file-format,但是這表明你可以得到誤報,並且必須考慮endian https:// stackoverflow.com/questions/6059302/how-to-check-if-a-file-is-gzip-compressed所以毫無疑問只是使用'.gz'是一個更快,更可靠的約定 – Davos

+0

@ user6022341'TextInputFormat'沒有做它是這個類中的'getCodec(Path file)'方法'org.apache.hadoop.io.compress.CompressionCodecFactory' – Davos

回答

2

您可以嘗試將ZIP文件的以下解決方案與gzipFileInputFormat庫結合使用嗎?

這裏 - How to open/stream .zip files through Spark? 你可以看到如何使用ZIP做到這一點:

rdd1 = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration()); 

gzipFileInputFormat:

https://github.com/bsankaran/internet_routing/blob/master/hadoop-tr/src/main/java/edu/usc/csci551/tools/GZipFileInputFormat.java

約newAPIHadoopFile(一些細節)可以在這裏找到: http://spark.apache.org/docs/latest/api/python/pyspark.html

+0

謝謝,我給了這幾個小時,並且無法使它工作。 – jspooner

1

我發現了幾個例子,幾乎符合我的需求。這是我用來解析用GZ壓縮的文件的最終代碼。

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream 
import org.apache.spark.input.PortableDataStream 
import scala.util.Try 
import java.nio.charset._ 

def extractBSM(ps: PortableDataStream, n: Int = 1024) = Try { 
    val gz = new GzipCompressorInputStream(ps.open) 
    Stream.continually { 
    // Read n bytes 
    val buffer = Array.fill[Byte](n)(-1) 
    val i = gz.read(buffer, 0, n) 
    (i, buffer.take(i)) 
    } 
    // Take as long as we've read something 
    .takeWhile(_._1 > 0) 
    .map(_._2) 
    .flatten 
    .toArray 
} 
def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8) 
val inputFile = "s3://my-bucket/157c96bd-fb21-4cc7-b340-0bd4b8e2b614" 
val rdd = sc.binaryFiles(inputFile).flatMapValues(x => extractBSM(x).toOption).map(x => decode()(x._2)) 
val rdd2 = rdd.flatMap { x => x.split("\n") } 
rdd2.take(10).foreach(println) 
+0

這適用於GZ,但我們確實需要檢查魔術字節並應用正確的壓縮算法 – jspooner

+0

您應該對源代碼進行歸因。此外,這僅適用於不可拆分的格式(如gz),因此具有識別功能的通用解決方案沒有多大意義。 –