2017-09-25 215 views
0

我正在對Kafka消費者實現進行集成測試。 我使用wurstmeister/kafka docker鏡像和Apache Kafka客戶端。 當我向某個主題發送「意外」消息時,嗡嗡的場景就是這樣。在RUN模式下,kafkaConsumer.poll(POLLING_TIMEOUT)似乎進入無限循環。當我調試時,它會在我暫停和運行時起作用。Apache kafka消費者對意外消息的無限循環

發送預期的消息(不要在反序列化時拋出異常)時沒有此問題。

這裏是我的卡夫卡docker-compose配置:

kafka: 
    image: wurstmeister/kafka 
    links: 
    - zookeeper 
    ports: 
    - "9092:9092" 
    environment: 
    KAFKA_ADVERTISED_HOST_NAME: localhost 
    KAFKA_ADVERTISED_PORT: 9092 
    KAFKA_CREATE_TOPICS: "ProductLocation:1:1,ProductInformation:1:1,InventoryAvailableToSell:1:1" 
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 
    volumes: 
    - /var/run/docker.sock:/var/run/docker.sock 

Java的通用消費:

@Override 
public Collection<T> consume() { 
    String eventToBePublishedName = ERROR_WHILE_RESETTING_OFFSET; 
    boolean success = false; 

    try { 
     kafkaConsumer.resume(kafkaAssignments); 
     if (isPollingTypeFull) { 
      // dummy poll because its needed before resetting offset. 
      // https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition 
      kafkaConsumer.poll(POLLING_TIMEOUT); 
      resetOffset(); 
     } else if (!offsetGotResetFirstTime) { 
      resetOffset(); 
      offsetGotResetFirstTime = true; 
     } 

     eventToBePublishedName = ERROR_WHILE_POLLING; 

     ConsumerRecords<Object, T> records; 

     List<T> output = new ArrayList<>(); 

     do { 
      records = kafkaConsumer.poll(POLLING_TIMEOUT); 
      records.forEach(cr -> { 
       T val = cr.value(); 
       if (val != null) { 
        output.add(cr.value()); 
       } 
      }); 
     } while (records.count() > 0); 

     eventToBePublishedName = CONSUMING; 
     success = true; 
     kafkaConsumer.pause(kafkaAssignments); 
     return output; 
    } finally { 
     applicationEventPublisher.publishEvent(
       new OperationResultApplicationEvent(
         this, OperationType.ConsumingOfMessages, eventToBePublishedName, success)); 
    } 
} 

的反序列化:

public T deserialize(String topic, byte[] data) { 
    try { 
     JsonNode jsonNode = mapper.readTree(data); 
     JavaType javaType = mapper.getTypeFactory().constructType(getValueClass()); 
     JsonNode value = jsonNode.get("value"); 
     return mapper.readValue(value.toString(), javaType); 
    } catch (IllegalArgumentException | IOException | SerializationException e) { 
     LOGGER.error("Can't deserialize data [" + Arrays.toString(data) 
       + "] from topic [" + topic + "]", e); 
     return null; 
    } 
} 

在我的集成測試,我創建一個話題通過發送給時間標記的主題名稱進行每項測試。這會創建新的主題並使測試成爲無狀態。

這是我如何配置卡夫卡消費者:

Properties properties = new Properties(); 
    properties.put("bootstrap.servers", kafkaConfiguration.getServer()); 
    properties.put("group.id", kafkaConfiguration.getGroupId()); 
    properties.put("key.deserializer", kafkaConfiguration.getKeyDeserializer().getName()); 
    properties.put("value.deserializer", kafkaConfiguration.getValueDeserializer().getName()); 

回答

0

如果你面臨這樣的,只是close消費者開始使用之前使用它們,或者使用pauseresume後。

1

抓住異常並將您承諾的偏移量提前+1以跳過「毒丸」消息。

+0

你說什麼異常?我說民意調查進行到無限循環。這是因爲之前的消費者關於該主題並未關閉。兩者都使用不是線程安全的卡夫卡用戶==>只需關閉用戶就可以修復它。我只是返回null,然後篩選代碼示例中顯示的消費者記錄輪詢的結果。偏移量自動提交。 –

+0

我正在討論如何修復以前的消費者應用程序。我雖然說過以前的消費者因爲無法反序列化意外消息而死亡。在這種情況下,您應該修復該應用程序以捕獲SerDes異常(或任何其他異常),並關閉並退出或將偏移量提前1並繼續。 –

+0

消費者不會死亡(因此不會拋出異常),它在投票過程中一直處於懸掛狀態。這與Apache kafka消費者線程安全性有關。我們需要關閉特定主題和組ID的卡夫卡消費者,然後再創建一個新組標識(針對相同的主題和組ID)。這是問題所在。 –