有可能在阿帕奇弗林克以下格式讀取壓縮文件:
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
你可以從包名看,弗林克這是否使用Hadoop的InputFormats。 這是閱讀使用弗林克的斯卡拉API GZ文件的示例: (你至少需要弗林克0.8.1)
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val job = new JobConf()
val hadoopInput = new TextInputFormat()
FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)
lines.print
env.execute("Read gz files")
}
阿帕奇弗林克只有內建支持.deflate文件。添加對更多壓縮編解碼器的支持很容易,但尚未完成。
將HadoopInputFormats與Flink結合使用不會導致任何性能損失。 Flink擁有對Hadoop的Writable
類型的內建序列化支持。