2017-03-04 41 views
0

嗨,我是kafka新手我正在使用卡夫卡版本0.10.2和zookeeper版本3.4.9。我有一個主題有兩個分區和兩個消費者運行。所以爲了提高處理速度,我決定將分區數量增加到10個,這樣我可以增加消費者的數量。於是我就命令當增加卡夫卡中的分區數時,消息沒有在RoundRobin順序中分發

./kafka-topics.sh --zookeeper本地主機:2181 --alter --topic話題 --partition 10

所以我看到兩件怪事

  1. 我的消費者仍然僅附加到兩個分區。 (預期的行爲,兩個消費者應該聽所有的10個分區)

  2. 消息正在被推到兩個(舊分區)。新分區沒有收到任何消息。預期的行爲信息應在所有分區

我使用這個命令來查看有關分區

./kafka-consumer-groups.sh --bootstrap細節分佈在輪轉方式。) -server localhost:9092 --describe --group課題組

我的消費代碼:

class KafkaPollingConsumer implements Runnable { 
    private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class) 
     private static final String TAG = "[KafkaPollingConsumer]" 
     private final KafkaConsumer<String, byte []> kafkaConsumer 
     private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>() 
     List topicNameList 
     Map kafkaTopicConfigMap = new HashMap<String,Object>() 
     Map kafkaTopicMessageListMap = new HashMap<String,List>() 

     public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex){ 
      logger.debug("{} [Constructor] [Enter] Thread Name {} serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex) 
      logger.debug("Populating Property for kafak consumer") 
      Properties kafkaConsumerProperties = new Properties() 
      kafkaConsumerProperties.put("group.id", groupName) 
      kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
      kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumer.deserializer.CustomObjectDeserializer") 
      switch(serverType){ 
       case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() : 
        kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode) 
        kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit) 
        kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset) 
        break 
       case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() : 
        kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode) 
        kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit) 
        kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset) 
        kafkaConsumerProperties.put("max.poll.records",10) 
        kafkaConsumerProperties.put("max.poll.interval.ms",900000) 
        kafkaConsumerProperties.put("request.timeout.ms",900000) 
        break 
       default : 
        throw "Invalid server type" 
        break 
      } 
      logger.debug("{} [Constructor] KafkaConsumer Property Populated {}",properties.toString()) 
      kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties) 
      topicNameList = topicNameRegex.split(Pattern.quote('|')) 
      logger.debug("{} [Constructor] Kafkatopic List {}",topicNameList.toString()) 
      logger.debug("{} [Constructor] Exit",TAG) 
     } 

     private class HandleRebalance implements ConsumerRebalanceListener { 
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 
      } 

      public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 
       if(currentOffsetsMap != null && !currentOffsetsMap.isEmpty()) { 
        logger.debug("{} In onPartitionsRevoked Rebalanced ",TAG) 
        kafkaConsumer.commitSync(currentOffsetsMap) 
       } 
      } 
     } 

     @Override 
     void run() { 
      logger.debug("{} Starting Thread ThreadName {}",TAG,Thread.currentThread().getName()) 
      populateKafkaConfigMap() 
      initializeKafkaTopicMessageListMap() 
      String topicName 
      String consumerClassName 
      String consumerMethodName 
      Boolean isBatchJob 
      Integer batchSize = 0 
      final Thread mainThread = Thread.currentThread() 
      Runtime.getRuntime().addShutdownHook(new Thread() { 
       public void run() { 
        logger.error("{},gracefully shutdowning thread {}",TAG,mainThread.getName()) 
        kafkaConsumer.wakeup() 
        try { 
         mainThread.join() 
        } catch (InterruptedException exception) { 
         logger.error("{} Error : {}",TAG,exception.getStackTrace().join("\n")) 
        } 
       } 
      }) 
      kafkaConsumer.subscribe(topicNameList , new HandleRebalance()) 
      try{ 
       while(true){ 
        logger.debug("{} Starting Consumer with polling time in ms 100",TAG) 
        ConsumerRecords kafkaRecords = kafkaConsumer.poll(100) 
        for(ConsumerRecord record: kafkaRecords){ 
         topicName = record.topic() 
         DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName) 
         consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY) 
         consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY) 
         isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY) 
         logger.debug("Details about Message") 
         logger.debug("Thread {}",mainThread.getName()) 
         logger.debug("Topic {}",topicName) 
         logger.debug("Partition {}",record.partition().toString()) 
         logger.debug("Offset {}",record.offset().toString()) 
         logger.debug("clasName {}",consumerClassName) 
         logger.debug("methodName {}",consumerMethodName) 
         logger.debug("isBatchJob {}",isBatchJob.toString()) 
         if(isBatchJob == true){ 
          batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString()) 
          logger.debug("batchSize {}",batchSize.toString()) 
         } 
         Object message = record.value() 
         logger.debug("message {}",message.toString()) 
         publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName) 
         Thread.sleep(60000) 
         currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1)) 
        } 
        logger.debug("{} Commiting Messages to Kafka",TAG) 
        kafkaConsumer.commitSync(currentOffsetsMap) 
       } 
      } 
      catch(InterruptException exception){ 
       logger.error("{} In InterruptException",TAG) 
       logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n")) 
      } 
      catch (WakeupException exception) { 
       logger.error("{} In WakeUp Exception",TAG) 
       logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n")) 
      } 
      catch(Exception exception){ 
       logger.error("{} In Exception",TAG) 
       logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n")) 
      } 
      finally { 
       logger.error("{} In finally commiting remaining offset ",TAG) 
       publishAllKafkaTopicBatchMessages() 
       kafkaConsumer.commitSync(currentOffsetsMap) 
       kafkaConsumer.close() 
       logger.error("{} Exiting Consumer",TAG) 
      } 
     } 


private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName){ 
    logger.debug("{} [publishMessageToConsumer] Enter",TAG) 
    if(isBatchJob == true){ 
     publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName) 
    } 
    else{ 
     publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message) 
    } 
    logger.debug("{} [publishMessageToConsumer] Exit",TAG) 
} 

private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message){ 
    logger.debug("{} [publishMessageToNonBatchConsumer] Enter",TAG) 
    executeConsumerMethod(consumerClassName,consumerMethodName,message) 
    logger.debug("{} [publishMessageToNonBatchConsumer] Exit",TAG) 
} 

private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName){ 
    logger.debug("{} [publishMessageToBatchConsumer] Enter",TAG) 
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName) 
    consumerMessageList.add(message) 
    if(consumerMessageList.size() == batchSize){ 
     logger.debug("{} [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG) 
     executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList) 
     consumerMessageList.clear() 
    } 
    kafkaTopicMessageListMap.put(topicName,consumerMessageList) 
    logger.debug("{} [publishMessageToBatchConsumer] Exit",TAG) 
} 

private void populateKafkaConfigMap(){ 
    logger.debug("{} [populateKafkaConfigMap] Enter",TAG) 
    KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance() 
    topicNameList.each { topicName -> 
     DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName) 
     kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject) 
    } 
    logger.debug("{} [populateKafkaConfigMap] kafkaConfigMap {}",TAG,kafkaTopicConfigMap.toString()) 
    logger.debug("{} [populateKafkaConfigMap] Exit",TAG) 
} 

private void initializeKafkaTopicMessageListMap(){ 
    logger.debug("{} [initializeKafkaTopicMessageListMap] Enter",TAG) 
    topicNameList.each { topicName -> 
     kafkaTopicMessageListMap.put(topicName,[]) 
    } 
    logger.debug("{} [populateKafkaConfigMap] kafkaTopicMessageListMap {}",TAG,kafkaTopicMessageListMap.toString()) 
    logger.debug("{} [initializeKafkaTopicMessageListMap] Exit",TAG) 
} 

private void executeConsumerMethod(String className, String methodName, def messages){ 
    try{ 
     logger.debug("{} [executeConsumerMethod] Enter",TAG) 
     logger.debug("{} [executeConsumerMethod] className {} methodName {} messages {}",TAG,className,methodName,messages.toString()) 
     Class.forName(className)."$methodName"(messages) 
    } catch (Exception exception){ 
     logger.error("{} [{}] Error while executing method : {} of class: {} with params : {} - {}", TAG, Thread.currentThread().getName(), methodName, 
       className, messages.toString(), exception.getStackTrace().join("\n")) 
    } 
    logger.debug("{} [executeConsumerMethod] Exit",TAG) 
} 

private void publishAllKafkaTopicBatchMessages(){ 
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Enter",TAG) 
    String consumerClassName = null 
    String consumerMethodName = null 
    kafkaTopicMessageListMap.each { topicName,messageList -> 
     DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName) 
     consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY) 
     consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY) 
     logger.debug("{} Pushing message in topic {} className {} methodName {} ",TAG,topicName,consumerClassName,consumerMethodName) 
     if(messageList != null && messageList.size() > 0){ 
      executeConsumerMethod(consumerClassName, consumerMethodName, messageList) 
      messageList.clear() 
      kafkaTopicMessageListMap.put(topicName,messageList) 
     } 
    } 
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Exit",TAG) 
} 

消費屬性是:

auto.commit.interval.ms = 5000 
auto.offset.reset = earliest 
bootstrap.servers = [localhost:9092] 
check.crcs = true 
client.id = consumer-1 
connections.max.idle.ms = 540000 
enable.auto.commit = false 
exclude.internal.topics = true 
fetch.max.bytes = 52428800 
fetch.max.wait.ms = 500 
fetch.min.bytes = 1 
group.id = t1 
heartbeat.interval.ms = 3000 
interceptor.classes = null 
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
max.partition.fetch.bytes = 1048576 
max.poll.interval.ms = 36000000 
max.poll.records = 10 
metadata.max.age.ms = 300000 
metric.reporters = [] 
metrics.num.samples = 2 
metrics.recording.level = INFO 
metrics.sample.window.ms = 30000 
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] 
receive.buffer.bytes = 65536 
reconnect.backoff.ms = 50 
request.timeout.ms = 36000000 
retry.backoff.ms = 100 
sasl.jaas.config = null 
sasl.kerberos.kinit.cmd = /usr/bin/kinit 
sasl.kerberos.min.time.before.relogin = 60000 
sasl.kerberos.service.name = null 
sasl.kerberos.ticket.renew.jitter = 0.05 
sasl.kerberos.ticket.renew.window.factor = 0.8 
sasl.mechanism = GSSAPI 
security.protocol = PLAINTEXT 
send.buffer.bytes = 131072 
session.timeout.ms = 10000 
ssl.cipher.suites = null 
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
ssl.endpoint.identification.algorithm = null 
ssl.key.password = null 
ssl.keymanager.algorithm = SunX509 
ssl.keystore.location = null 
ssl.keystore.password = null 
ssl.keystore.type = JKS 
ssl.protocol = TLS 
ssl.provider = null 
ssl.secure.random.implementation = null 
ssl.trustmanager.algorithm = PKIX 
ssl.truststore.location = null 
ssl.truststore.password = null 
ssl.truststore.type = JKS 
value.deserializer = class com.custom.kafkaconsumer.deserializer.CustomObjectDeserializer 

監製代碼:

Properties kafkaProducerProperties = getKafkaProducerProperties(topicName) 
     if(kafkaProducerProperties != null){ 
      priorityKafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, byte[]>(kafkaProducerProperties) 
       ProducerRecord<String,byte []> record = new ProducerRecord<String,byte []>(topicName, messageMap) 
      try { 
       priorityKafkaProducer.send(record).get() 
       priorityKafkaProducer.close() 
      } catch (Exception e) { 
       e.printStackTrace() 
      } 

     } 
     else{ 
      throw "Invalid Producer Properties for " + topicName 
     } 

生產者配置:

acks = 1 
    batch.size = 16384 
    block.on.buffer.full = false 
    bootstrap.servers = [localhost:9092] 
    buffer.memory = 33554432 
    client.id = 
    compression.type = none 
    connections.max.idle.ms = 540000 
    interceptor.classes = null 
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer 
    linger.ms = 0 
    max.block.ms = 60000 
    max.in.flight.requests.per.connection = 5 
    max.request.size = 1048576 
    metadata.fetch.timeout.ms = 60000 
    metadata.max.age.ms = 300000 
    metric.reporters = [] 
    metrics.num.samples = 2 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    receive.buffer.bytes = 32768 
    reconnect.backoff.ms = 50 
    request.timeout.ms = 30000 
    retries = 0 
    retry.backoff.ms = 100 
    sasl.jaas.config = null 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.min.time.before.relogin = 60000 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    sasl.mechanism = GSSAPI 
    security.protocol = PLAINTEXT 
    send.buffer.bytes = 131072 
    ssl.cipher.suites = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.endpoint.identification.algorithm = null 
    ssl.key.password = null 
    ssl.keymanager.algorithm = SunX509 
    ssl.keystore.location = null 
    ssl.keystore.password = null 
    ssl.keystore.type = JKS 
    ssl.protocol = TLS 
    ssl.provider = null 
    ssl.secure.random.implementation = null 
    ssl.trustmanager.algorithm = PKIX 
    ssl.truststore.location = null 
    ssl.truststore.password = null 
    ssl.truststore.type = JKS 
    timeout.ms = 30000 
    value.serializer = class com.abhimanyu.kafkaproducer.serializer.CustomObjectSerializer 

問題是我面對預期的行爲還是我錯過了一些東西?

回答

0

您是否等待5分鐘(或者配置了元數據刷新時間間隔)?

+0

嗨漢斯我已經設置了屬性metadata.max.age.ms = 5000。但仍然需要5分鐘才能添加新的分區。我是否設置了錯誤的財產? – Abhimanyu

+0

嘗試設置生產者屬性topic.metadata.refresh.interval.ms請參閱http://stackoverflow.com/questions/27722871/where-do-i-define-topic-metadata-refresh-interval-ms –

+0

感謝您回覆我認爲topic.metadata.refresh.ms是卡夫卡版本0.80,我使用卡夫卡0.10版本。我仍然在我的producer.properties中添加了它,但它沒有奏效。 – Abhimanyu