我正在使用Spark Streaming + Kafka將數據攝入到HDFS中。使用Spark Streaming + Kafka HDFS中的空文件夾
val ssc = new StreamingContext(sparkContext, Seconds(30))
val messageRecBased = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
.map(_._2)
每隔30秒,卡夫卡隊列中的所有數據將存儲在HDFS中的單獨文件夾中。某些文件夾包含一個名爲part-00000的空文件,因爲在相應的批間隔(30秒)內沒有數據。 我用以下行來過濾這些文件夾:
messageRecBased.filter { x => x.size == 0 }
messageRecBased.repartition(1).saveAsTextFiles("PATH")
,但它不工作,它仍然會產生與空文件夾。
謝謝。它解決了用空文件創建文件夾的問題。現在我有一個包含三個文件的文件夾:part-00000,part-00001,part00002。令人驚訝的消息將在這三個文件中被覆蓋!所以我沒有一個包含所有消息的文件,我有三個文件和最後三條消息!你知道爲什麼嗎? – RedNay8080
你在卡夫卡有多少主題? – Mohitt
我只有一個話題。 – RedNay8080