2017-06-22 118 views
0

我在嘗試Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)的示例代碼。該代碼可以運行沒有任何錯誤,但我不能收到任何記錄。如果我從頭開始運行kafka-console-consumer.sh,我可以獲得記錄。有誰知道原因?我的代碼如下:在Spark Streaming中的Kafka createDirectStream

val broker = "221.181.73.44:19092" 
val topics = Array("connect-test") 
val groupid = "SparkStreamingLoad3" 
val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> broker, 
    "group.id" -> groupid, 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "auto.offset.reset" -> "earliest", //earliest | latest 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) 

stream.print() 

ssc.start() 
ssc.awaitTermination() 

我SBT版本是:

version := "1.0" 
scalaVersion := "2.10.6" 
libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-streaming-kafka-0-10_2.10" % "2.1.0", 
    "org.apache.spark" % "spark-core_2.10" % "2.1.0", 
"org.apache.spark" % "spark-streaming_2.10" % "2.1.0", 
"org.apache.kafka" % "kafka_2.10" % "0.10.2.1" 
) 

謝謝!

+0

您正在運行的Kafka也是0.10.x版嗎? – maasg

+0

在服務器上運行的Kafka版本是0.10.2.1。在libs文件夾中,我有kafka_2.10-0.10.2.1。*文件。該版本與SBT構建配置相同。 – rsmin

回答

0
val broker = "221.181.73.44:19092" 

的默認端口爲,這可能是問題。

"auto.offset.reset" -> "earliest""enable.auto.commit" -> false應始終讓您從主題日誌的開頭讀取,因爲您的偏移不會存儲在任何位置。所以這沒有問題。

此外,我們可以看到您使用的完整命令爲kafka-console-consumer.sh

1

最後,我解決了問題。答案如下:

  1. 該主題中的數據是從控制檯生成器生成的,該控制檯生成器是一個字符串列表。但是,數據格式爲[數組[字節],數組[字節]]。不是[字符串,字符串]。所以如果我使用StringDeserializer,將不會收到數據。

  2. 我從控制檯消費者的源代碼的writeTo學會(consumerRecord:ConsumerRecord [數組[字節],數組[字節]],輸出:爲PrintStream):單位

鍵/在RDDS值可以包含空值。在我的情況下,所有的鍵都是空的。我使用以下代碼獲取數據:

stream = KafkaUtils.createDirectStream [Array [Byte],Array [Byte]](ssc,PreferConsistent,Subscribe [Array [Byte],Array [Byte]](topics,kafkaParams )) stream.map(rdd => new String(Option(rdd.key())。getOrElse(「null」.getBytes))+「||| delemiter |||」+ new String(Option(rdd.value ())。getOrElse(「null」.getBytes)))。print()

+0

你是如何檢查數據流中數據的真實格式的? – Paul

相關問題