2016-11-28 43 views
1

我正在研究需要從卡夫卡讀取數據的火花應用程序。我創建了一個卡夫卡主題,製作人發佈消息。我從控制檯消費者驗證消息已成功發佈。爲什麼我的Spark Streaming應用程序不打印來自Kafka的記錄數(使用count運算符)?

我寫了一個短的應用程序來讀取來自卡夫卡的數據,但它沒有得到任何數據。 以下是我使用的代碼:

def main(args: Array[String]): Unit = { 
    val Array(zkQuorum, group, topics, numThreads) = args 
    val sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[2]") 
    val ssc = new StreamingContext(sparkConf, Seconds(2)) 

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 

    process(lines) // prints the number of records in Kafka topic 

    ssc.start() 
    ssc.awaitTermination() 
} 

private def process(lines: DStream[String]) { 
    val z = lines.count() 
    println("count of lines is "+z) 
    //edit 
    lines.foreachRDD(rdd => rdd.map(println) 
    // <-- Why does this **not** print? 
) 

如何解決這個問題有什麼建議?

******編輯****

我用

lines.foreachRDD(rdd => rdd.map(println) 

以及在實際代碼但也不能正常工作。我在後面提到的保留期限如下:Kafka spark directStream can not get data。但問題仍然存在。

回答

1

processDStream管道與沒有那個被執行每批次間隔的管道輸出操作的延續。

可以通過「看」,它通過讀取count operator簽名:引述count's scaladoc

count(): DStream[Long] 

返回一個新DSTREAM,其中每個RDD,由各個計數產生一個單一的元素此DStream的RDD。

所以,你有一個卡夫卡記錄的dstream,你轉換爲單個值(作爲count的結果)的dstream。沒有多少產出(對控制檯或任何其他水槽)。

你必須使用輸出操作員的官方文檔Output Operations on DStreams中描述結束管道:

輸出操作允許DSTREAM的數據推給外部系統,如數據庫或文件系統。由於輸出操作實際上允許外部系統使用轉換後的數據,因此它們會觸發所有DStream轉換的實際執行(類似於RDD的操作)。

(低電平)輸出運營商註冊輸入dstreams作爲輸出dstreams以便執行可以開始。根據設計,Spark Streaming的DStream沒有作爲輸出dstream的概念。知道並能夠區分輸入和輸出流是DStreamGraph

+0

我在實際代碼中也使用了Output操作符。我編輯了原始問題以表明問題仍然存在。現在有什麼建議? – Alok

+0

嘿嘿,你是「交易」一個沒有輸出的操作員到另一個:)你可以先請使用'lines.count()。print'而不是'lines.count()'?我相信你會得到10個記錄打印出來的控制檯。至於RDD的情況,請使用'rdd.foreach(println)'(而不是'rdd.map(println)'這是一個轉換)。玩的開心! :) –

相關問題