2016-10-14 27 views
1

我編寫了一個火花流應用程序,通過使用KafkaUtils從Kafka接收數據,我想要做的是打印出我從Kafka收到的數據。這裏是我的代碼(我使用spark-submit來執行我的火花流作業):在火花流媒體中打印RDD到控制檯

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
messages.print() 

當我運行它時,它工作得很好。如果輸入的是,B,C卡夫卡製片人,我可以從星火流如下得到的結果:

Time: 1476481700000 ms 

------------------------------------------- 
(null,a) 
(null,b) 
(null,c) 

但是,如果我加一條線來計算行數,messages.print()不能工作。碼如下所示:

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
messages.print() 
messages.count().print() 

我得到以下結果:

------------------------------------------- 
Time: 1476481800000 ms 
------------------------------------------- 
4 

只有計數的數量變得越來越打印出來,並且數據不能被打印出來。 我的問題是爲什麼messages.print()在我添加messages.count.print()後不會執行。
另一個問題是什麼null代表元組(null, a)(null, b)(null, c)

回答

0

print()沒有問題,它將打印兩個消息並計算如下。滾動並檢查您的日誌。

------------------------------------------- 
Time: 1476481700000 ms 
------------------------------------------- 
(null,a) 
(null,b) 
(null,c) 

------------------------------------------- 
Time: 1476481800000 ms 
------------------------------------------- 
4 

KafkaUtils.createDirectStream方法返回的<Kafka topic, Kafka message> DSTREAM。檢查thisthis與主題相關的帖子爲空。

+0

實現最終目標,是的,你是對的。他們打印出來,但我以前沒有看到他們。謝謝 – Frankie

0

你的代碼應該可以工作,但會給你一個選擇。但這種方法只適用於測試或學習。相反,執行兩個actions的,你可以只用單一action

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
    //Cache your RDD before you perform any heavyweight operations. 
    messages.cache() 
    val result = messages.collect(); 
    println(result.size + " size") 
    result.foreach { input => println(input) }