我正在使用creatDirectStream爲了整合SparkStreaming和Kafka。這裏是我使用的代碼:kafka將sparkstreaming到HDFS
val ssc = new StreamingContext(new SparkConf, Seconds(10))
val kafkaParams = Map("metadata.broker.list" -> "sandbox:6667")
val topics = Set("topic1")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
現在我想存儲消息到HDFS。這樣做是對的嗎?
messages.saveAsTextFiles("/tmp/spark/messages")
是的,這是正常的 –