2016-04-28 19 views
1

我是Spark Streaming中的新成員。如何使用ssc.filestream()處理Java中的zip目錄

我想要監視和解壓所有.zip文件在特定的目錄中。 我已引用http://cutler.io/2012/07/hadoop-processing-zip-files-in-mapreduce/寫下面的代碼

JavaPairInputDStream<Text, BytesWritable> streamlogFiles=ssc.fileStream(logDir, Text.class, BytesWritable.class, ZipFileInputFormat.class); 

然而,我發現,FILESTREAM()不處理zip文件exsitedin /移動到指定的目錄。

有什麼我想念的嗎?

回答

0

您可以使用這裏的ZipFileInputFormat:https://github.com/cotdp/com-cotdp-hadoop/tree/master/src/main/java/com/cotdp/hadoop

,並使用

val files = ssc.fileStream[Text, BytesWritable, ZipFileInputFormat](someInputDirectory) 

files.foreachRDD{ rdd => 
    rdd.foreachPartition { partition => 
    partition.foreach { record => 
     process(record._1.toString, record._2) 
    } 
    } 
} 

哪裏record._1.toString是文件名作爲record._2是BytesWriteable該文件的FILESTREAM。如果你不想讓InputFormat解壓縮.zip,你需要一個不同的自定義FileInputFormat或者不得不修改ZipFileInputFormat。

爲了測試這一點,請確保您添加到someInputDirectory的.zip文件是在1分鐘前最後修改的<,否則SparkStreaming會在默認情況下忽略它。