我在嘗試Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)的示例代碼。該代碼可以運行沒有任何錯誤,但我不能收到任何記錄。如果我從頭開始運行kafka-console-consumer.sh,我可以獲得記錄。有誰知道原因?我的代碼如下:在Spark Streaming中的Kafka createDirectStream
val broker = "221.181.73.44:19092"
val topics = Array("connect-test")
val groupid = "SparkStreamingLoad3"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> broker,
"group.id" -> groupid,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "earliest", //earliest | latest
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
stream.print()
ssc.start()
ssc.awaitTermination()
我SBT版本是:
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-streaming-kafka-0-10_2.10" % "2.1.0",
"org.apache.spark" % "spark-core_2.10" % "2.1.0",
"org.apache.spark" % "spark-streaming_2.10" % "2.1.0",
"org.apache.kafka" % "kafka_2.10" % "0.10.2.1"
)
謝謝!
您正在運行的Kafka也是0.10.x版嗎? – maasg
在服務器上運行的Kafka版本是0.10.2.1。在libs文件夾中,我有kafka_2.10-0.10.2.1。*文件。該版本與SBT構建配置相同。 – rsmin