2017-04-08 99 views
3

我已經建立了一個包含3個經紀人和3個分區的主題的實驗Kafka環境。我有一個生產者和一個消費者。我想修改特定用戶的分區偏移量。我在kafka文檔中讀到kafka中的消費者提交/獲取API可以提交特定的偏移量或獲取消費者讀取的最新偏移量。這裏是鏈接的API:kafka消費者提取API不返回正確的偏移值

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI

我用的代碼從下面的頁面寫我的代碼,以便從一個特定的消費取偏移。但是,獲取API會爲請求的偏移量返回「-1」的值。這裏是示例代碼:
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka

我也讀出第一鏈接「如果沒有偏移與消費羣下一個主題分區的經紀人不設置錯誤代碼(相關,因爲它是不是真的錯誤),但返回空元數據並將偏移字段設置爲-1。「

但是,我已經產生了一些消息,我的消費者已經消費了消息並輸出了每個讀取消息的偏移量。

如果有人能幫到這個,我會非常樂意。我想知道我的代碼的哪個部分是錯誤的。或者也許這個API有問題。請不要猶豫,提出任何有用的意見。我的代碼與我提供的鏈接中的代碼完全相同。但是,如果你需要看我的代碼,請告訴我把它放在這裏。

卡夫卡版本是0.10.2.0

我卡夫卡的配置是:

經紀人1:端口9093

經紀人2:端口9094

經紀人3:端口9095

主題:「testpic3」

.......... ............

消費者配置:

props.put("group.id", "test"); 

props.put("client.id", "MyConsumer"); 

................

這裏是我的代碼:

public class KafkaOffsetManage { 

public static void main(String[] args) { 


    BlockingChannel channel = new BlockingChannel("localhost", 9095, 
      BlockingChannel.UseDefaultBufferSize(), 
      BlockingChannel.UseDefaultBufferSize(), 
      5000 /* read timeout in millis */); 
    channel.connect(); 
    final String MY_GROUP = "test"; 
    final String MY_CLIENTID = "MyConsumer"; 
    int correlationId = 0; 
    final TopicAndPartition testPartition0 = new TopicAndPartition("testpic3",0); 
    final TopicAndPartition testPartition1 = new TopicAndPartition("testpic3",1); 
    final TopicAndPartition testPartition2 = new TopicAndPartition("testpic3",2); 
    channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID)); 
    ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer()); 
    System.out.println("+++++++++++++++++++++++++++"); 

    System.out.println(metadataResponse.errorCode()); 

    if (metadataResponse.errorCode() == ErrorMapping.NoError()) { 
     Broker offsetManager = metadataResponse.coordinator(); 
     // if the coordinator is different, from the above channel's host then reconnect 
     channel.disconnect(); 
     channel = new BlockingChannel(offsetManager.host(), offsetManager.port(), 
       BlockingChannel.UseDefaultBufferSize(), 
       BlockingChannel.UseDefaultBufferSize(), 
       5000 /* read timeout in millis */); 
     channel.connect(); 
     System.out.println("Connected to Offset Manager"); 
     System.out.println(offsetManager.host() + ", Port:"+ offsetManager.port()); 

    } else { 
     // retry (after backoff) 
    } 



    // How to fetch offsets 


    List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>(); 
    partitions.add(testPartition0); 
    //partitions.add(testPartition1); 
    OffsetFetchRequest fetchRequest = new OffsetFetchRequest(
      MY_GROUP, 
      partitions, 
      (short) 2 /* version */, // version 1 and above fetch from Kafka, version 0 fetches from ZooKeeper 
      correlationId, 
      MY_CLIENTID); 
    try { 
     channel.send(fetchRequest.underlying()); 
     OffsetFetchResponse fetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer()); 
     OffsetMetadataAndError result = fetchResponse.offsets().get(testPartition0); 

     short offsetFetchErrorCode = result.error(); 
     if (offsetFetchErrorCode == ErrorMapping.NotCoordinatorForConsumerCode()) { 
      channel.disconnect(); 
      // Go to step 1 and retry the offset fetch 
     } else if (offsetFetchErrorCode == ErrorMapping.OffsetsLoadInProgressCode()) { 
      // retry the offset fetch (after backoff) 
     } else { 
      long retrievedOffset = result.offset(); 
      String retrievedMetadata = result.metadata(); 
      System.out.println("The retrieved offset is:"+ Long.toString(retrievedOffset)); 
      System.out.println(retrievedMetadata); 
      System.out.println(result.toString()); 
     } 
    } 
    catch (Exception e) { 
     channel.disconnect(); 
     // Go to step 1 and then retry offset fetch after backoff 
    } 
} 
} 

代碼的輸出是在這裏:

+++++++++++++++++++++++++++ 
0 

Connected to Offset Manager 

user-virtual-machine, Port:9093 
------------------------ 
The retrieved offset is:-1 

OffsetMetadataAndError[-1,,3] 

Process finished with exit code 0 

一個奇怪的事情我有關卡夫卡的依賴關係。當我加入這個依賴,我的代碼不會在節目中認識了一些類:

<artifactId>kafka_2.10</artifactId> 
<version>0.10.2.0</version> 

班「ConsumerMetadataRequest」和「ConsumerMetadataResponse」無法識別。

所以我添加這種依賴性,而不是:

<artifactId>kafka_2.10</artifactId> 
<version>0.8.2.0</version> 

謝謝

+0

請確保提供代碼的相關部分。您所包含的鏈接可能會在某些時候停止工作,使您的問題變得毫無用處。 –

+0

非常感謝Patrick。這裏是我的代碼 – Farhad9660

回答

0

我假設你已經添加

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>0.10.2.0</version> 
</dependency> 

爲你的依賴。這是卡夫卡本身。你需要消耗/生產卡夫卡0.10.2什麼是:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.10.2.0</version> 
</dependency> 

對於消費(並給予消費者的法操縱偏移)使用KafkaConsumer它有詳細的javadoc和比Committing and fetching consumer offsets in Kafka更方便類。

除此之外的是,如果你仍然想使用例如,你鏈接,你有可能是代碼,提出問題:

List<TopicAndPartition> partitions = new ArrayList<TopicAndPartition>(); 
    partitions.add(testPartition0); 

只添加一個分區,有可能是在這個分區上有沒有消息(你有3個分區,所以你發送的消息可能會在另外兩個消息中出現)。在Kafka中,每個分區是分開的,並且每個分區的消費者組具有不同的偏移量。

+0

親愛的reynev,我已經添加了對kafka客戶端的依賴關係。但問題依然存在。儘管如此,你可能有一個關於 依賴關係的觀點。一件奇怪的事是關於卡夫卡的依賴。 kafka_2.10 0.10.2.0當我添加 上述依賴項我的代碼無法識別程序中的某些類。類:ConsumerMetadataRequest和ConsumerMetadataResponse不被識別。所以我添加了這種依賴關係: kafka_2.10 0.8.2.0 Farhad9660

+0

該代碼適用於此依賴項,但輸出不正確。我不確定底層問題是否依賴。無論如何,我不能添加卡夫卡版本0.10.2.0的依賴關係。此外,分區沒有問題,因爲我在3個分區上打印了消息,並且所有消息都已寫入。我也試過partitions.add(testPartition1)和partitions.add(testPartition2)。 – Farhad9660

+0

我的建議是將依賴關係從'kadka_2.10'改爲'kafka-clients',這是Java API,用於想要使用Kafka的應用程序。我將用一個示例爲您編寫一些代碼 – reynev