0

如何保存卡夫卡星火消息流數據幀到單個文件如何保存卡夫卡星火消息流數據幀到單個文件

我已經制定,這將消耗使用Kafka-星火流過程中的信息的應用程序。

一旦收到數據,它就會轉換成數據幀。

然後流式數據幀被保存爲文本文件,這裏數據幀被保存到每個文件中,用於每個kafka流消息,下面是我用於將數據保存爲文本文件的代碼,這是保存數據爲每條消息添加多個文本文件。

DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append") 
           .save("path") 

在這裏,我想實現的是流日期框架的要求需要保存爲每個卡夫卡消息的單個文件,如果可能的話,請幫我解決。

在此先感謝

回答

0

下面的代碼可能會對您有所幫助。只需生成RDD列表,然後將其合併即可。

var dStreamRDDList = new ListBuffer[RDD[String]] 
dStream.foreachRDD(rdd => 
    { 
     dStreamRDDList += rdd 
    }) 
val joinRDD = ssc.sparkContext.union(dStreamRDDList) 
//then convert joinRDD to DataFrame (DF) 
DF.coalesce(1).write.format("com.databricks.spark.csv").mode("append") 
          .save("path") 
相關問題