2015-12-02 22 views
1

我遇到過錯誤:rdd動作會暫停在DStream foreachRDD函數中。rdd動作會暫停在DStream foreachRDD函數中

請參考以下代碼。

import _root_.kafka.common.TopicAndPartition 
import _root_.kafka.message.MessageAndMetadata 
import _root_.kafka.serializer.StringDecoder 
import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka._ 


object StreamingTest { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount") 
    val sc = new SparkContext(conf) 
    val ssc = new StreamingContext(sc, Seconds(5)) 

    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092") 

    val topicOffset = Map(TopicAndPartition("test_log",0)->200000L) 
    val messageHandler = (mmd: MessageAndMetadata[String, String]) => mmd.message 
    val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,String](ssc,kafkaParams,topicOffset,messageHandler) 

    kafkaStream.foreachRDD(rdd=>{ 
     println(rdd.count()) 
     val collected = rdd.collect() 
    }) 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

錯誤:

功能rdd.count()rdd.collect()將被暫停。我使用spark版本是1.4.1。

我是否以錯誤的方式使用它?

在此先感謝。

+0

你得到了什麼錯誤? – Kaushal

+0

沒有顯示錯誤。 – andrew

+0

你在羣集模式下運行spark嗎? –

回答

1

如果我們沒有從kafka設置maxRatePerPartition,它會嘗試讀取所有數據,因此它會看起來像掛起。但它實際上忙於閱讀數據。

後,我設置以下配置

spark.streaming.kafka.maxRatePerPartition=1000 

將打印日誌。