2017-06-28 79 views
0

我試過用api spark.read.csv來讀取帶有擴展名bzgzip的壓縮csv文件。有效。但在源代碼中,我找不到任何可以聲明codec類型的選項參數。Spark SQL如何讀取壓縮的csv文件?

即使在這link,在書面方面只有設置codec。任何人都可以告訴我,或給出源代碼的路徑,顯示spark 2.x版本如何處理壓縮的csv文件。

回答

1

所有與文本相關的數據源(包括CSVDataSource)都使用Hadoop File API來處理文件(它也位於Spark Core的RDD中)。

您可以在readFile相關線路,導致HadoopFileLinesReader它具有以下行:

val fileSplit = new FileSplit(
    new Path(new URI(file.filePath)), 
    file.start, 
    file.length, 
    // TODO: Implement Locality 
    Array.empty) 

使用Hadoop的org.apache.hadoop.fs.Path與底層文件(S)的壓縮交易。


快速google搜索後,我能夠找到與壓縮是mapreduce.output.fileoutputformat.compress涉及Hadoop的財產。

,導致我星火SQL的CompressionCodecs具有以下壓縮配置:

"none" -> null, 
"uncompressed" -> null, 
"bzip2" -> classOf[BZip2Codec].getName, 
"deflate" -> classOf[DeflateCodec].getName, 
"gzip" -> classOf[GzipCodec].getName, 
"lz4" -> classOf[Lz4Codec].getName, 
"snappy" -> classOf[SnappyCodec].getName) 
代碼

下面,你可以找到setCodecConfiguration使用「我們」的選項。

def setCodecConfiguration(conf: Configuration, codec: String): Unit = { 
    if (codec != null) { 
     conf.set("mapreduce.output.fileoutputformat.compress", "true") 
     conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) 
     conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) 
     conf.set("mapreduce.map.output.compress", "true") 
     conf.set("mapreduce.map.output.compress.codec", codec) 
    } else { 
     // This infers the option `compression` is set to `uncompressed` or `none`. 
     conf.set("mapreduce.output.fileoutputformat.compress", "false") 
     conf.set("mapreduce.map.output.compress", "false") 
    } 
    } 

另一種方法getCodecClassName用於解決compression選項JSONCSV,和text格式。

+0

謝謝,夥計。我檢查了'Path'軟件包文件,仍然很困惑。如果你能提供更多的細節,這將是非常善良的;比如'Path'包中的哪一部分處理壓縮。 THX再次。 –

+0

添加了一些Spark SQL代碼處理壓縮的附加鏈接。由於我對Hadoop的源代碼一無所知,因此我將留下探索它作爲您的家庭練習。 –

+1

非常感謝你的耐心和善良。我通過鏈閱讀了'getCodecClassName'代碼。我發現那部分代碼僅在寫入方面被調用。在閱讀方面我沒有找到用法。我認爲這項工作可能由文件系統完成;但沒有找到證據。 –

0

您不必爲gz壓縮的csv,tsv文件做任何特殊處理,以便通過spark 2.x版本讀取。下面的代碼試圖與spark 2.0.2

val options= Map("sep" -> ",") 
val csvRDD = spark.read.options(options).csv("file.csv.gz") 

我也有類似的做了製表符分隔的GZ文件

val options= Map("sep" -> "\t") 
val csvRDD = spark.read.options(options).csv("file.tsv.gz") 

您也可以指定文件夾來讀取多張.gz文件與解壓縮的文件組合

val csvRDD = spark.read.options(options).csv("https://stackoverflow.com/users/mithun/tsvfilelocation/")