2015-04-03 64 views
7

我有一個使用bzip2壓縮的wikipedia轉儲(從http://dumps.wikimedia.org/enwiki/下載),但我不想解壓縮它:我想在進行解壓縮的同時處理它。針對Apache Flink的BZip2壓縮輸入

我知道可以用普通的Java來完成它(例如見Java - Read BZ2 file and uncompress/parse on the fly),但是我想知道它在Apache Flink中是如何做到的?我可能需要的是類似於https://github.com/whym/wikihadoop但對於Flink而不是Hadoop。

回答

5

有可能在阿帕奇弗林克以下格式讀取壓縮文件:

org.apache.hadoop.io.compress.BZip2Codec 
org.apache.hadoop.io.compress.DefaultCodec 
org.apache.hadoop.io.compress.DeflateCodec 
org.apache.hadoop.io.compress.GzipCodec 
org.apache.hadoop.io.compress.Lz4Codec 
org.apache.hadoop.io.compress.SnappyCodec 

你可以從包名看,弗林克這是否使用Hadoop的InputFormats。 這是閱讀使用弗林克的斯卡拉API GZ文件的示例: (你至少需要弗林克0.8.1)

def main(args: Array[String]) { 

    val env = ExecutionEnvironment.getExecutionEnvironment 
    val job = new JobConf() 
    val hadoopInput = new TextInputFormat() 
    FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz")) 
    val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job) 

    lines.print 

    env.execute("Read gz files") 
} 

阿帕奇弗林克只有內建支持.deflate文件。添加對更多壓縮編解碼器的支持很容易,但尚未完成。

將HadoopInputFormats與Flink結合使用不會導致任何性能損失。 Flink擁有對Hadoop的Writable類型的內建序列化支持。