2017-04-07 51 views
1

我正在玩Spark Streaming和Kafka(使用Scala API),並且希望通過Spark Streaming從一組Kafka主題中讀取消息。kafka和Spark:通過API獲取主題的第一個偏移量

下面的方法:

val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest") 
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 

讀取卡夫卡到最新的偏移,但不給我,我需要(因爲我從一組主題閱讀的元數據,我需要爲每個消息,我讀了這個話題),但這種其他方法KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler)明確希望我沒有的偏移量。

我知道有這個shell命令給你最後的偏移量。

kafka-run-class.sh kafka.tools.GetOffsetShell 
    --broker-list <broker>: <port> 
    --topic <topic-name> --time -1 --offsets 1 

KafkaCluster.scala是對於曾經是公共開發商,讓您正是我想什麼的API。

提示?

回答

1

您可以從GetOffsetShell.scala kafka API documentation

val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId) 
val topicAndPartition = TopicAndPartition(topic, partitionId) 
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) 
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets 
使用代碼

或者你可以用獨特的groupId創造新的消費和使用它獲得第一偏移

val consumer=new KafkaConsumer[String, String](createConsumerConfig(config.brokerList)) 
consumer.partitionsFor(config.topic).foreach(pi => { 
     val topicPartition = new TopicPartition(pi.topic(), pi.partition()) 

     consumer.assign(List(topicPartition)) 
     consumer.seekToBeginning() 
     val firstOffset = consumer.position(topicPartition) 
... 
+0

謝謝@Natalia! 在第一個片段..什麼是時間?什麼是'時間',什麼是'nOffset'? – salvob

+0

哦,我明白了,你從[這裏]獲得了第一個片段(https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/tools/GetOffsetShell.scala) – salvob

+0

@salvob這個問題的答案是否正確?如果是這樣,請標記爲已回答的問題,否則,如果您能告訴我們您是如何解決問題的,我將不勝感激:) – dbustosp

相關問題