1

我正在使用單節點Kafka代理(0.10.2)和單節點zookeeper代理(3.4.9)。我有一個消費者服務器(單核和1.5 GB RAM)。每當我運行一個進程有5個或更多的線程我的消費者的線程拋出這些異常Kafka消費者拋出java.lang.OutOfMemoryError:直接緩衝區內存

  1. 異常1

java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)

  • 異常2
  • 後被殺

    Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)

    我GOOGLE了它和使用下述JVM參數,但仍時有發生相同的異常紅色

    -XX:MaxDirectMemorySize=768m

    -Xms512m

    如何解決此問題?是否需要其他javm參數調整?

    我卡夫卡的消費準則是

    import com.mongodb.DBObject 
    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener 
    import org.apache.kafka.clients.consumer.ConsumerRecord 
    import org.apache.kafka.clients.consumer.ConsumerRecords 
    import org.apache.kafka.clients.consumer.KafkaConsumer 
    import org.apache.kafka.clients.consumer.OffsetAndMetadata 
    import org.apache.kafka.clients.consumer.OffsetCommitCallback 
    import org.apache.kafka.common.TopicPartition 
    import org.apache.kafka.common.errors.InterruptException 
    import org.apache.kafka.common.errors.WakeupException 
    import org.slf4j.Logger 
    import org.slf4j.LoggerFactory 
    import java.util.regex.Pattern 
    
    class KafkaPollingConsumer implements Runnable { 
    private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class) 
    private static final String TAG = "[KafkaPollingConsumer]" 
    private final KafkaConsumer<String, byte []> kafkaConsumer 
    private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>() 
    List topicNameList 
    Map kafkaTopicConfigMap = new HashMap<String,Object>() 
    Map kafkaTopicMessageListMap = new HashMap<String,List>() 
    Boolean isRebalancingTriggered = false 
    private final Long REBALANCING_SLEEP_TIME = 1000 
    
    public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex, Integer batchSize, Integer maxPollTime, Integer requestTime){ 
        logger.debug("{} [Constructor] [Enter] Thread Name {} serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex) 
        logger.debug("Populating Property for kafak consumer") 
        logger.debug("BatchSize {}",batchSize) 
        Properties kafkaConsumerProperties = new Properties() 
        kafkaConsumerProperties.put("group.id", groupName) 
        kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
        kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumerv2.deserializer.CustomObjectDeserializer") 
        switch(serverType){ 
         case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() : 
          kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode) 
          kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit) 
          kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset) 
          break 
         case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() : 
          kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode) 
          kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit) 
          kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset) 
          kafkaConsumerProperties.put("max.poll.records",1) 
          kafkaConsumerProperties.put("max.poll.interval.ms",600000) 
          kafkaConsumerProperties.put("request.timeout.ms",600005) 
          break 
         default : 
          throw "Invalid server type" 
          break 
        } 
        logger.debug("{} [Constructor] KafkaConsumer Property Populated {}",properties.toString()) 
        kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties) 
        topicNameList = topicNameRegex.split(Pattern.quote('|')) 
        logger.debug("{} [Constructor] Kafkatopic List {}",topicNameList.toString()) 
        logger.debug("{} [Constructor] Exit",TAG) 
    } 
    
    private class HandleRebalance implements ConsumerRebalanceListener { 
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) { 
         logger.error('{} In onPartitionAssigned setting isRebalancingTriggered to false',TAG) 
         isRebalancingTriggered = false 
        } 
    
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) { 
         logger.error("{} In onPartitionsRevoked setting osRebalancingTriggered to true",TAG) 
         isRebalancingTriggered = true 
         publishAllKafkaTopicBatchMessages() 
         commitOffset() 
    
        } 
    } 
    
    private class AsyncCommitCallBack implements OffsetCommitCallback{ 
    
        @Override 
        void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { 
    
        } 
    } 
    
    @Override 
    void run() { 
        logger.debug("{} Starting Thread ThreadName {}",TAG,Thread.currentThread().getName()) 
        populateKafkaConfigMap() 
        initializeKafkaTopicMessageListMap() 
        String topicName 
        String consumerClassName 
        String consumerMethodName 
        Boolean isBatchJob 
        Integer batchSize = 0 
        final Thread mainThread = Thread.currentThread() 
        Runtime.getRuntime().addShutdownHook(new Thread() { 
         public void run() { 
          logger.error("{},gracefully shutdowning thread {}",TAG,mainThread.getName()) 
          kafkaConsumer.wakeup() 
          try { 
           mainThread.join() 
          } catch (InterruptedException exception) { 
           logger.error("{} Error : {}",TAG,exception.getStackTrace().join("\n")) 
          } 
         } 
        }) 
        kafkaConsumer.subscribe(topicNameList , new HandleRebalance()) 
        try{ 
         while(true){ 
          logger.debug("{} Starting Consumer with polling time in ms 100",TAG) 
          ConsumerRecords kafkaRecords 
          if(isRebalancingTriggered == false) { 
           kafkaRecords = kafkaConsumer.poll(100) 
          } 
          else{ 
           logger.error("{} in rebalancing going to sleep",TAG) 
           Thread.sleep(REBALANCING_SLEEP_TIME) 
           continue 
          } 
          for(ConsumerRecord record: kafkaRecords){ 
           if(isRebalancingTriggered == true){ 
            break 
           } 
           currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1)) 
           topicName = record.topic() 
           DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName) 
           consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY) 
           consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY) 
           isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY) 
           logger.debug("Details about Message") 
           logger.debug("Thread {}",mainThread.getName()) 
           logger.debug("Topic {}",topicName) 
           logger.debug("Partition {}",record.partition().toString()) 
           logger.debug("Offset {}",record.offset().toString()) 
           logger.debug("clasName {}",consumerClassName) 
           logger.debug("methodName {}",consumerMethodName) 
           logger.debug("isBatchJob {}",isBatchJob.toString()) 
           Object message = record.value() 
           logger.debug("message {}",message.toString()) 
           if(isBatchJob == true){ 
            prepareMessagesBatch(topicName,message) 
            //batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString()) 
            //logger.debug("batchSize {}",batchSize.toString()) 
           } 
           else{ 
            publishMessageToNonBatchConsumer(consumerClassName,consumerMethodName,message) 
           } 
           //publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName) 
           //try { 
           // kafkaConsumer.commitAsync(currentOffsetsMap,new AsyncCommitCallBack()) 
           logger.debug("{} Commiting Messages to Kafka",TAG) 
           //} 
           /*catch(Exception exception){ 
            kafkaConsumer.commitSync(currentOffsetsMap) 
            currentOffsetsMap.clear() 
            logger.error("{} Error while commiting async so commiting in sync {}",TAG,exception.getStackTrace().join("\n")) 
           }*/ 
          } 
          commitOffset() 
          publishAllKafkaTopicBatchMessages() 
         } 
        } 
        catch(InterruptException exception){ 
         logger.error("{} In InterruptException",TAG) 
         logger.error("{} In Exception exception message {}",TAG,exception.getMessage()) 
         logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n")) 
        } 
        catch (WakeupException exception) { 
         logger.error("{} In WakeUp Exception",TAG) 
         logger.error("{} In Exception exception message {}",TAG,exception.getMessage()) 
         logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n")) 
        } 
        catch(Exception exception){ 
         exception.getMessage() 
         logger.error("{} In Exception",TAG) 
         logger.error("{} In Exception exception message {}",TAG,exception.getMessage()) 
         logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n")) 
        } 
        finally { 
         logger.error("{} In finally commiting remaining offset ",TAG) 
         publishAllKafkaTopicBatchMessages() 
         //kafkaConsumer.commitSync(currentOffsetsMap) 
         kafkaConsumer.close() 
         logger.error("{} Exiting Consumer",TAG) 
        } 
    } 
    
    private void commitOffset(){ 
        logger.debug("{} [commitOffset] Enter") 
        logger.debug("{} currentOffsetMap {}",currentOffsetsMap.toString()) 
        if(currentOffsetsMap.size() > 0) { 
         kafkaConsumer.commitSync(currentOffsetsMap) 
         currentOffsetsMap.clear() 
        } 
        logger.debug("{} [commitOffset] Exit") 
    
    } 
    
    private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName){ 
        logger.debug("{} [publishMessageToConsumer] Enter",TAG) 
        if(isBatchJob == true){ 
         publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName) 
        } 
        else{ 
         publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message) 
        } 
        logger.debug("{} [publishMessageToConsumer] Exit",TAG) 
    } 
    
    private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message){ 
        logger.debug("{} [publishMessageToNonBatchConsumer] Enter",TAG) 
        executeConsumerMethod(consumerClassName,consumerMethodName,message) 
        logger.debug("{} [publishMessageToNonBatchConsumer] Exit",TAG) 
    } 
    
    private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName){ 
        logger.debug("{} [publishMessageToBatchConsumer] Enter",TAG) 
        List consumerMessageList = kafkaTopicMessageListMap.get(topicName) 
        consumerMessageList.add(message) 
        if(consumerMessageList.size() == batchSize){ 
         logger.debug("{} [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG) 
         executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList) 
         consumerMessageList.clear() 
        } 
        kafkaTopicMessageListMap.put(topicName,consumerMessageList) 
        logger.debug("{} [publishMessageToBatchConsumer] Exit",TAG) 
    } 
    
    private void populateKafkaConfigMap(){ 
        logger.debug("{} [populateKafkaConfigMap] Enter",TAG) 
        KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance() 
        topicNameList.each { topicName -> 
         DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName) 
         kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject) 
        } 
        logger.debug("{} [populateKafkaConfigMap] kafkaConfigMap {}",TAG,kafkaTopicConfigMap.toString()) 
        logger.debug("{} [populateKafkaConfigMap] Exit",TAG) 
    } 
    
    private void initializeKafkaTopicMessageListMap(){ 
        logger.debug("{} [initializeKafkaTopicMessageListMap] Enter",TAG) 
        topicNameList.each { topicName -> 
         kafkaTopicMessageListMap.put(topicName,[]) 
        } 
        logger.debug("{} [populateKafkaConfigMap] kafkaTopicMessageListMap {}",TAG,kafkaTopicMessageListMap.toString()) 
        logger.debug("{} [initializeKafkaTopicMessageListMap] Exit",TAG) 
    } 
    
    private void executeConsumerMethod(String className, String methodName, def messages){ 
        try{ 
         logger.debug("{} [executeConsumerMethod] Enter",TAG) 
         logger.debug("{} [executeConsumerMethod] className {} methodName {} messages {}",TAG,className,methodName,messages.toString()) 
         Class.forName(className)."$methodName"(messages) 
        } catch (Exception exception){ 
         logger.error("{} [{}] Error while executing method : {} of class: {} with params : {} - {}", TAG, Thread.currentThread().getName(), methodName, 
           className, messages.toString(), exception.getStackTrace().join("\n")) 
        } 
        logger.debug("{} [executeConsumerMethod] Exit",TAG) 
    } 
    
    private void publishAllKafkaTopicBatchMessages(){ 
        logger.debug("{} [publishAllKafkaTopicBatchMessages] Enter",TAG) 
        String consumerClassName = null 
        String consumerMethodName = null 
        kafkaTopicMessageListMap.each { topicName, messageList -> 
         if (messageList != null && messageList.size() > 0) { 
          DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName) 
          consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY) 
          consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY) 
          logger.debug("{} Pushing message in topic {} className {} methodName {} ", TAG, topicName, consumerClassName, consumerMethodName) 
          if (messageList != null && messageList.size() > 0) { 
           executeConsumerMethod(consumerClassName, consumerMethodName, messageList) 
           messageList.clear() 
           kafkaTopicMessageListMap.put(topicName, messageList) 
    
          } 
         } 
        } 
        logger.debug("{} [publishAllKafkaTopicBatchMessages] Exit",TAG) 
    } 
    
    private void prepareMessagesBatch(String topicName,Object message){ 
        logger.debug("{} [prepareMessagesBatch] Enter",TAG) 
        logger.debug("{} [prepareMessagesBatch] preparing batch for topic {}",TAG,topicName) 
        logger.debug("{} [prepareMessagesBatch] preparting batch for message {}",TAG,message.toString()) 
        List consumerMessageList = kafkaTopicMessageListMap.get(topicName) 
        consumerMessageList.add(message) 
        kafkaTopicMessageListMap.put(topicName,consumerMessageList) 
    
    } 
    

    }

    +1

    你可以看到你有多少數據有待/未使用?有了kafka,你需要能夠緩衝存儲器中的所有數據。你生產什麼信息速率?如果你以低得多的速度生產會發生什麼。 –

    回答

    2

    卡夫卡消費者處理由以下兩個參數的數據積壓,

    max.poll.interval.ms
    最大在使用消費者組管理時調用poll()之間的延遲。這提供了消費者在獲取更多記錄之前可以空閒的時間量的上限。如果在此超時到期之前未調用poll(),則認爲使用者失敗,並且該組將重新平衡以便將分區重新分配給其他成員。
    缺省值是300000

    max.poll.records
    在一個單一的呼叫返回到輪詢的最大記錄數()。
    默認值是500。

    忽略根據需要可能會導致最大數據的輪詢,消費者可能不能夠與所述可用資源來處理來設置在上述兩個參數,導致內存不足或失效犯消費者有時會抵消。因此,始終建議使用參數max.poll.recordsmax.poll.interval.ms參數。

    在你的代碼的情況下KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString()缺少這兩個參數,也可能會被輪詢時內存不足問題的原因。

    相關問題