我基本上想要使用來自Kafka的數據並將其寫入HDFS。但發生的情況是,它不是在hdfs中編寫任何文件。它會創建空文件。來自Kafka的Spark流式傳輸和Avro格式的HDFS寫入
而且請指導我,如果我想寫在HDF格式的HDFS我如何修改代碼。
爲了簡單起見,我寫了本地C盤。
import org.apache.spark.SparkConf
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.kafka.common.serialization.StringDeserializer
object KafkaStreaming extends App{
val conf = new org.apache.spark.SparkConf().setMaster("local[*]").setAppName("kafka-streaming")
val conext = new SparkContext(conf)
val ssc = new StreamingContext(conext, org.apache.spark.streaming.Milliseconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (true: java.lang.Boolean))
val topics = Array("topic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
val lines = stream.map(_.value)
stream.foreachRDD(rdd => {
rdd.coalesce(1).saveAsTextFile("C:/data/spark/")
})
ssc.start()
ssc.awaitTermination()}
而且下面是build.sbt
name := "spark-streaming"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-
10_2.11" % "2.2.0"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.11.0.1"
它所需要的是一個過濾器,以檢查是否該批次是空.. stream.map(_。值).foreachRDD(RDD => { rdd.foreach(的println ) 如果){ rdd.saveAsTextFile( 「C:/數據/火花/」) }(rdd.isEmpty(!) }) 但我仍然面對的問題是,新一批覆蓋舊數據。我希望所有的數據被追加到文件中.. –