2016-03-08 97 views
2

我正在使用Spark Streaming + Kafka將數據攝入到HDFS中。使用Spark Streaming + Kafka HDFS中的空文件夾

val ssc = new StreamingContext(sparkContext, Seconds(30)) 
val messageRecBased = KafkaUtils.createStream(ssc, zkQuorum, group, topic) 
    .map(_._2) 

每隔30秒,卡夫卡隊列中的所有數據將存儲在HDFS中的單獨文件夾中。某些文件夾包含一個名爲part-00000的空文件,因爲在相應的批間隔(30秒)內沒有數據。 我用以下行來過濾這些文件夾:

messageRecBased.filter { x => x.size == 0 } 
messageRecBased.repartition(1).saveAsTextFiles("PATH") 

,但它不工作,它仍然會產生與空文件夾。

回答

1

如果你看一下DStream.saveAsTextFiles()方法定義,它只是派生的RDD.saveAsObjectFile每個在DirectKafkaInputDStreamRDD的。

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope { 
    val saveFunc = (rdd: RDD[T], time: Time) => { 
     val file = rddToFileName(prefix, suffix, time) 
     rdd.saveAsObjectFile(file) 
    } 
    this.foreachRDD(saveFunc) 
    } 

因此,而不是使用DStream.saveAsTextFiles(),你可以選擇寫你自己喜歡的事:

messageRecBased.foreachRDD{ rdd => 
    rdd.repartition(1) 
    if(!rdd.isEmpty) 
     rdd.saveAsObjectFile("FILE_PATH") 
} 
+0

謝謝。它解決了用空文件創建文件夾的問題。現在我有一個包含三個文件的文件夾:part-00000,part-00001,part00002。令人驚訝的消息將在這三個文件中被覆蓋!所以我沒有一個包含所有消息的文件,我有三個文件和最後三條消息!你知道爲什麼嗎? – RedNay8080

+0

你在卡夫卡有多少主題? – Mohitt

+0

我只有一個話題。 – RedNay8080

0

你可以檢查分區是否爲空,如果不是,那麼只保存RDD,就像這樣。該代碼應該防止空的RDD節省。

messageRecBased.partitions.isEmpty 
+1

你的意思messageRecBased.foreachRDD(X =>如果(x.partitions.isEmpty){X! .saveAsTextFile(msgFileText)})?因爲「分區」不適用於Dstream [String] – RedNay8080

+0

@Sunil;在DStream中沒有分區()方法。 – Mohitt

相關問題