2016-02-26 107 views
2

我是新來的星火道歉問這樣一個問題,從閱讀卡夫卡經紀人主題的特定分區的數據。我有一個用例,我想在Spark Streaming的幫助下從主題的特定分區讀取數據。我正在使用Spark Java API來做所有的事情。通過星火流

我已創建了複製因子2和5個分區一個名爲test的話題。希望在火花流Kafka集成指南的幫助下,我能夠完成所有這些工作,比如創建一個JavaStreamingContext對象,創建一個到Kafka代理的直接流,並能夠讀取所有分區中的所有消息。

但是還是我的使用情況不符合,我要讀卡夫卡的經紀人,而不是從所有分區中的所有消息的主題的特定分區中的唯一消息。

回答

1

你應該能夠讀取特定分區從特定的偏移使用下面的代碼。

Map<TopicAndPartition, Long> consumerOffsets = new HashMap<TopicAndPartition, Long>(); 
TopicAndPartition p1 = new TopicAndPartition("yourtopic","yourpartition"); 
consumerOffsets.put(p1,offset); 

JavaInputDStream<String> messages = KafkaUtils.createDirectStream(
     jssc, 
     String.class, 
     String.class, 
     StringDecoder.class, 
     StringDecoder.class, 
     String.class, 
     kafkaParams, 
     consumerOffsetsLong, 
     new Function<MessageAndMetadata<String, String>, String>() { 
      public String call(MessageAndMetadata<String, String> msgAndMeta) throws Exception { 
       return msgAndMeta.message(); 
      } 
     } 
);