0

我想並行讀取Kafka消息,從而並行處理它們。我的卡夫卡話題有10個分區。我試圖創建5個DStream並應用Union方法來操作單個DStream。這裏是我試過至今代碼:在Kafka Spark流中讀取並處理並行性

def main(args: scala.Array[String]): Unit = { 

    val properties = readProperties() 

    val streamConf = new SparkConf().setMaster("local[2]").setAppName("KafkaStream") 
    val ssc = new StreamingContext(streamConf, Seconds(1)) 
    // println("defaultParallelism: "+ssc.sparkContext.defaultParallelism) 
    ssc.sparkContext.setLogLevel("WARN") 
    val numPartitionsOfInputTopic = 5 
    val group_id = Random.alphanumeric.take(4).mkString("consumer_group") 
    val kafkaStream = { 
     val kafkaParams = Map("zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"), 
          "group.id" -> group_id, 
          "zookeeper.connection.timeout.ms" -> "3000")      

     val streams = (1 to numPartitionsOfInputTopic).map { _ => 
          KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
          ssc, kafkaParams, Map("kafka_topic" -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2) 
     } 
    val unifiedStream = ssc.union(streams) 
    val sparkProcessingParallelism = 5 
    unifiedStream.repartition(sparkProcessingParallelism) 
    } 

    kafkaStream.foreachRDD { x => 
    x.foreach {  
     msg => println("Message: "+msg) 
     processMessage(msg) 
    }  
    } 

    ssc.start() 
    ssc.awaitTermination() 
} 

在執行時,它甚至沒有收到一個消息,更不用說進一步處理它。我在這裏錯過了什麼嗎?如果需要,請建議進行更改。謝謝。

回答

0

我強烈建議切換到直接流。爲什麼?

Direct Stream默認將並行度設置爲您在Kafka中分區的數量。沒有什麼必須做更多工作 - 只是創建直接流如果您創建5個DStreams做你的工作:)

,默認情況下將在5線程讀取,一個非直接DSTREAM =一個線程

+0

Gaweda嗨,謝謝你提出一個替代方案。我只是嘗試直接流方法。儘管如此,這些消息依次得到處理。處理時間實際上不到一秒鐘。爲了測試它,我在消息處理函數中加入了'Thread.sleep(10000)'。這裏是我正在形成的KafkaParams的地圖: 'val kafkaParams = Map(「metadata.broker.list」 - > localhost:9092, 「group.id」 - >「dsdc」, 「auto.offset。重置「 - >」最大「)' 我需要改變一些東西嗎? – Arjun