1
我正在玩Spark Streaming和Kafka(使用Scala API),並且希望通過Spark Streaming從一組Kafka主題中讀取消息。kafka和Spark:通過API獲取主題的第一個偏移量
下面的方法:
val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
讀取卡夫卡到最新的偏移,但不給我,我需要(因爲我從一組主題閱讀的元數據,我需要爲每個消息,我讀了這個話題),但這種其他方法KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler)
明確希望我沒有的偏移量。
我知道有這個shell命令給你最後的偏移量。
kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list <broker>: <port>
--topic <topic-name> --time -1 --offsets 1
和KafkaCluster.scala
是對於曾經是公共開發商,讓您正是我想什麼的API。
提示?
謝謝@Natalia! 在第一個片段..什麼是時間?什麼是'時間',什麼是'nOffset'? – salvob
哦,我明白了,你從[這裏]獲得了第一個片段(https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/tools/GetOffsetShell.scala) – salvob
@salvob這個問題的答案是否正確?如果是這樣,請標記爲已回答的問題,否則,如果您能告訴我們您是如何解決問題的,我將不勝感激:) – dbustosp