2016-03-15 47 views
3

我正在使用Kafka 0.8.2,並且在我的客戶中出現錯誤,提示「offset commit failed with ...」。當看着主題「__consumer_offsets」。我看到它有50個分區數。這是正常的嗎?我只能通過刪除所有Kafka日誌並重新啓動我的Kafka服務器來解決此問題。有一種方法可以在達到特定數量的分區時刪除此主題,或者我承諾我的偏移量錯誤嗎?kafka __consumer_offsets主題有過多的分區計數

這是我如何將我的補償:

public void commitOffsets(BlockingChannel channel, String topic, String groupid, int partition, String clientName, int corrilationid, long offset) throws Exception{ 

    if (commitTryCount > 100){ 
     throw new Exception("Offset commit failed with " + channel.host()); 
    } 

    long now = System.currentTimeMillis(); 
    Map<TopicAndPartition, OffsetAndMetadata> offsets = new LinkedHashMap<TopicAndPartition, OffsetAndMetadata>(); 
    //for (int i = 0; i < this.totalPartitions; i++){ 
     TopicAndPartition topicPartition = new TopicAndPartition(topic, partition); 
     offsets.put(topicPartition, new OffsetAndMetadata(offset, topic, now)); 
    //}  

    //initialize offset commit 
    OffsetCommitRequest commitRequest = new OffsetCommitRequest(groupid, offsets, corrilationid, clientName, (short) 1); 
    channel.send(commitRequest.underlying()); 
    OffsetCommitResponse commitResponse = OffsetCommitResponse.readFrom(channel.receive().buffer()); 
    if (commitResponse.hasError()){   
     for (Object partitionErrorCode: commitResponse.errors().values()){ 
      if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.OffsetMetadataTooLargeCode()){ 
       //reduce the size of the metadata and retry 
       offset--; 
       commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset); 
       commitTryCount++; 
      } else if (Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.NotCoordinatorForConsumerCode() 
        || Short.parseShort(partitionErrorCode.toString()) == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) { 
       //discover new coordinator and retry 
       int newCorrilation = corrilationid; 
       newCorrilation++; 
       this.channel = discoverChannel(channel.host(), port, groupid, clientName, newCorrilation); 
       commitOffsets(this.channel, topic, groupid, partition, clientName, newCorrilation, offset); 
       commitTryCount++; 
      } else{ 
       //retry 
       commitOffsets(channel, topic, groupid, partition, clientName, corrilationid, offset); 
       commitTryCount++; 
      }//end of else    
     }//end of for 
    }//end of if 
}//end of method 

回答

1

我想通了之後,我貼我的代碼。當提交成功時,我忘了將變量「commitTryCount」設置爲0。我仍然想知道__consumer_offsets話題有50個分區是否正常?

+0

50是'offsetsets.topic.num.partitions' config的默認值,所以沒關係。你可以看看默認[這裏](http://kafka.apache.org/documentation.html) – serejja

0

是的,消費者偏移的50個分區是默認值。要更改,請設置offsets.topic.num.partitions屬性。

相關問題