我正在研究需要從卡夫卡讀取數據的火花應用程序。我創建了一個卡夫卡主題,製作人發佈消息。我從控制檯消費者驗證消息已成功發佈。爲什麼我的Spark Streaming應用程序不打印來自Kafka的記錄數(使用count運算符)?
我寫了一個短的應用程序來讀取來自卡夫卡的數據,但它沒有得到任何數據。 以下是我使用的代碼:
def main(args: Array[String]): Unit = {
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
process(lines) // prints the number of records in Kafka topic
ssc.start()
ssc.awaitTermination()
}
private def process(lines: DStream[String]) {
val z = lines.count()
println("count of lines is "+z)
//edit
lines.foreachRDD(rdd => rdd.map(println)
// <-- Why does this **not** print?
)
如何解決這個問題有什麼建議?
******編輯****
我用
lines.foreachRDD(rdd => rdd.map(println)
以及在實際代碼但也不能正常工作。我在後面提到的保留期限如下:Kafka spark directStream can not get data。但問題仍然存在。
我在實際代碼中也使用了Output操作符。我編輯了原始問題以表明問題仍然存在。現在有什麼建議? – Alok
嘿嘿,你是「交易」一個沒有輸出的操作員到另一個:)你可以先請使用'lines.count()。print'而不是'lines.count()'?我相信你會得到10個記錄打印出來的控制檯。至於RDD的情況,請使用'rdd.foreach(println)'(而不是'rdd.map(println)'這是一個轉換)。玩的開心! :) –