我試過用api spark.read.csv
來讀取帶有擴展名bz
或gzip
的壓縮csv文件。有效。但在源代碼中,我找不到任何可以聲明codec
類型的選項參數。Spark SQL如何讀取壓縮的csv文件?
即使在這link,在書面方面只有設置codec
。任何人都可以告訴我,或給出源代碼的路徑,顯示spark 2.x版本如何處理壓縮的csv文件。
我試過用api spark.read.csv
來讀取帶有擴展名bz
或gzip
的壓縮csv文件。有效。但在源代碼中,我找不到任何可以聲明codec
類型的選項參數。Spark SQL如何讀取壓縮的csv文件?
即使在這link,在書面方面只有設置codec
。任何人都可以告訴我,或給出源代碼的路徑,顯示spark 2.x版本如何處理壓縮的csv文件。
所有與文本相關的數據源(包括CSVDataSource)都使用Hadoop File API來處理文件(它也位於Spark Core的RDD中)。
您可以在readFile相關線路,導致HadoopFileLinesReader它具有以下行:
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)),
file.start,
file.length,
// TODO: Implement Locality
Array.empty)
使用Hadoop的org.apache.hadoop.fs.Path與底層文件(S)的壓縮交易。
快速google搜索後,我能夠找到與壓縮是mapreduce.output.fileoutputformat.compress
涉及Hadoop的財產。
,導致我星火SQL的CompressionCodecs具有以下壓縮配置:
"none" -> null,
"uncompressed" -> null,
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)
代碼
下面,你可以找到setCodecConfiguration使用「我們」的選項。
def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
if (codec != null) {
conf.set("mapreduce.output.fileoutputformat.compress", "true")
conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
conf.set("mapreduce.map.output.compress", "true")
conf.set("mapreduce.map.output.compress.codec", codec)
} else {
// This infers the option `compression` is set to `uncompressed` or `none`.
conf.set("mapreduce.output.fileoutputformat.compress", "false")
conf.set("mapreduce.map.output.compress", "false")
}
}
另一種方法getCodecClassName用於解決compression
選項JSON,CSV,和text格式。
您不必爲gz
壓縮的csv
,tsv
文件做任何特殊處理,以便通過spark 2.x
版本讀取。下面的代碼試圖與spark 2.0.2
val options= Map("sep" -> ",")
val csvRDD = spark.read.options(options).csv("file.csv.gz")
我也有類似的做了製表符分隔的GZ文件
val options= Map("sep" -> "\t")
val csvRDD = spark.read.options(options).csv("file.tsv.gz")
您也可以指定文件夾來讀取多張.gz
文件與解壓縮的文件組合
val csvRDD = spark.read.options(options).csv("https://stackoverflow.com/users/mithun/tsvfilelocation/")
謝謝,夥計。我檢查了'Path'軟件包文件,仍然很困惑。如果你能提供更多的細節,這將是非常善良的;比如'Path'包中的哪一部分處理壓縮。 THX再次。 –
添加了一些Spark SQL代碼處理壓縮的附加鏈接。由於我對Hadoop的源代碼一無所知,因此我將留下探索它作爲您的家庭練習。 –
非常感謝你的耐心和善良。我通過鏈閱讀了'getCodecClassName'代碼。我發現那部分代碼僅在寫入方面被調用。在閱讀方面我沒有找到用法。我認爲這項工作可能由文件系統完成;但沒有找到證據。 –