2017-06-14 91 views
1

也許有人在某個項目中使用了這個:我從Spark中寫給Cassandra,在Spark中我使用kafkaUtils.createDirectStream。通過Spark-Cassandra連接器,我們可以使用Dstream.saveToCassandra方法。但保存/追加到HDFS我用:從kafka到hdfs通過火花

stream.map(_.value).foreachRDD(rdd => { 
    val conf = new Configuration() 
    conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000/") 
    val fs = FileSystem.get(conf) 
    fs.append(new Path("textfile.txt")) 
    .write(rdd.collect().mkString.getBytes()) 
    fs.close() 
}) 

但我不認爲這是做到這一點的最好辦法。這可能是更好的使用類似:

val prepStr = { 
    val str = new StringBuilder 
    if (!rdd.isEmpty()) { 
    str.append(rdd.collect().mkString) 
    } 
    str 
} 

最後:

fs.append(path).write(prepStr.mkString.getBytes()) 

或許有人用另一種方式?

回答

1

假設你的流型的DStream[String]你可以使用由數據框作家提供的附加功能:

dstream.foreachRDD{rdd => 
    import sparkSession.implicits._ 
    val df = rdd.toDF() 
    df.write.mode("append").text("/hdfs/path/to/file") 
} 
+0

謝謝,我會嘗試這樣的方式來測試這個 –

+0

不,它寫的文件在路徑中,但不附加到現有文件 –

+0

它追加到您可以讀回的邏輯文件。但的確,它將由許多內部分區組成。 – maasg