2017-04-13 68 views
1

我正在使用Spark 2.1和Kafka 0.08.xx來執行Spark Streaming作業。這是一個文本過濾工作,在過程中大部分文本都會被過濾掉。爲什麼Spark Streaming + Kafka中的foreachRDD速度緩慢?

  1. 請直接DirectStream的輸出濾波:我用兩種不同的方式實現

    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 
    val jsonMsg = messages.map(_._2) 
    val filteredMsg = jsonMsg.filter(x=>x.contains(TEXT1) && x.contains(TEXT2) && x.contains(TEXT3)) 
    
  2. 使用foreachRDD功能

    messages.foreachRDD { rdd => 
          val record = rdd.map(_.2).filter(x => x.contains(TEXT1) && 
                   x.contains(TEXT2) && 
                   x.contains(TEXT3))} 
    

我發現第一種方法明顯比第二種方法快,但我不確定這是否是常見情況。

方法1和方法2是否有區別?

回答

1

filter是一種轉變。轉換懶洋洋地評估,也就是說,直到你執行一個動作,他們沒有做任何事情,如foreachRDD,寫數據等

所以在1實際上什麼也沒發生,因此顯著快於二,這是使用動作foreachRDD做些事情。

相關問題