0
我想並行讀取Kafka消息,從而並行處理它們。我的卡夫卡話題有10個分區。我試圖創建5個DStream並應用Union
方法來操作單個DStream。這裏是我試過至今代碼:在Kafka Spark流中讀取並處理並行性
def main(args: scala.Array[String]): Unit = {
val properties = readProperties()
val streamConf = new SparkConf().setMaster("local[2]").setAppName("KafkaStream")
val ssc = new StreamingContext(streamConf, Seconds(1))
// println("defaultParallelism: "+ssc.sparkContext.defaultParallelism)
ssc.sparkContext.setLogLevel("WARN")
val numPartitionsOfInputTopic = 5
val group_id = Random.alphanumeric.take(4).mkString("consumer_group")
val kafkaStream = {
val kafkaParams = Map("zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"),
"group.id" -> group_id,
"zookeeper.connection.timeout.ms" -> "3000")
val streams = (1 to numPartitionsOfInputTopic).map { _ =>
KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
ssc, kafkaParams, Map("kafka_topic" -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)
}
val unifiedStream = ssc.union(streams)
val sparkProcessingParallelism = 5
unifiedStream.repartition(sparkProcessingParallelism)
}
kafkaStream.foreachRDD { x =>
x.foreach {
msg => println("Message: "+msg)
processMessage(msg)
}
}
ssc.start()
ssc.awaitTermination()
}
在執行時,它甚至沒有收到一個消息,更不用說進一步處理它。我在這裏錯過了什麼嗎?如果需要,請建議進行更改。謝謝。
Gaweda嗨,謝謝你提出一個替代方案。我只是嘗試直接流方法。儘管如此,這些消息依次得到處理。處理時間實際上不到一秒鐘。爲了測試它,我在消息處理函數中加入了'Thread.sleep(10000)'。這裏是我正在形成的KafkaParams的地圖: 'val kafkaParams = Map(「metadata.broker.list」 - > localhost:9092, 「group.id」 - >「dsdc」, 「auto.offset。重置「 - >」最大「)' 我需要改變一些東西嗎? – Arjun