1
也許有人在某個項目中使用了這個:我從Spark中寫給Cassandra,在Spark中我使用kafkaUtils.createDirectStream
。通過Spark-Cassandra連接器,我們可以使用Dstream.saveToCassandra
方法。但保存/追加到HDFS我用:從kafka到hdfs通過火花
stream.map(_.value).foreachRDD(rdd => {
val conf = new Configuration()
conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000/")
val fs = FileSystem.get(conf)
fs.append(new Path("textfile.txt"))
.write(rdd.collect().mkString.getBytes())
fs.close()
})
但我不認爲這是做到這一點的最好辦法。這可能是更好的使用類似:
val prepStr = {
val str = new StringBuilder
if (!rdd.isEmpty()) {
str.append(rdd.collect().mkString)
}
str
}
最後:
fs.append(path).write(prepStr.mkString.getBytes())
或許有人用另一種方式?
謝謝,我會嘗試這樣的方式來測試這個 –
不,它寫的文件在路徑中,但不附加到現有文件 –
它追加到您可以讀回的邏輯文件。但的確,它將由許多內部分區組成。 – maasg