我正在對Kafka消費者實現進行集成測試。 我使用wurstmeister/kafka docker鏡像和Apache Kafka客戶端。 當我向某個主題發送「意外」消息時,嗡嗡的場景就是這樣。在RUN模式下,kafkaConsumer.poll(POLLING_TIMEOUT)
似乎進入無限循環。當我調試時,它會在我暫停和運行時起作用。Apache kafka消費者對意外消息的無限循環
發送預期的消息(不要在反序列化時拋出異常)時沒有此問題。
這裏是我的卡夫卡docker-compose
配置:
kafka:
image: wurstmeister/kafka
links:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ADVERTISED_PORT: 9092
KAFKA_CREATE_TOPICS: "ProductLocation:1:1,ProductInformation:1:1,InventoryAvailableToSell:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
Java的通用消費:
@Override
public Collection<T> consume() {
String eventToBePublishedName = ERROR_WHILE_RESETTING_OFFSET;
boolean success = false;
try {
kafkaConsumer.resume(kafkaAssignments);
if (isPollingTypeFull) {
// dummy poll because its needed before resetting offset.
// https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition
kafkaConsumer.poll(POLLING_TIMEOUT);
resetOffset();
} else if (!offsetGotResetFirstTime) {
resetOffset();
offsetGotResetFirstTime = true;
}
eventToBePublishedName = ERROR_WHILE_POLLING;
ConsumerRecords<Object, T> records;
List<T> output = new ArrayList<>();
do {
records = kafkaConsumer.poll(POLLING_TIMEOUT);
records.forEach(cr -> {
T val = cr.value();
if (val != null) {
output.add(cr.value());
}
});
} while (records.count() > 0);
eventToBePublishedName = CONSUMING;
success = true;
kafkaConsumer.pause(kafkaAssignments);
return output;
} finally {
applicationEventPublisher.publishEvent(
new OperationResultApplicationEvent(
this, OperationType.ConsumingOfMessages, eventToBePublishedName, success));
}
}
的反序列化:
public T deserialize(String topic, byte[] data) {
try {
JsonNode jsonNode = mapper.readTree(data);
JavaType javaType = mapper.getTypeFactory().constructType(getValueClass());
JsonNode value = jsonNode.get("value");
return mapper.readValue(value.toString(), javaType);
} catch (IllegalArgumentException | IOException | SerializationException e) {
LOGGER.error("Can't deserialize data [" + Arrays.toString(data)
+ "] from topic [" + topic + "]", e);
return null;
}
}
在我的集成測試,我創建一個話題通過發送給時間標記的主題名稱進行每項測試。這會創建新的主題並使測試成爲無狀態。
這是我如何配置卡夫卡消費者:
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaConfiguration.getServer());
properties.put("group.id", kafkaConfiguration.getGroupId());
properties.put("key.deserializer", kafkaConfiguration.getKeyDeserializer().getName());
properties.put("value.deserializer", kafkaConfiguration.getValueDeserializer().getName());
你說什麼異常?我說民意調查進行到無限循環。這是因爲之前的消費者關於該主題並未關閉。兩者都使用不是線程安全的卡夫卡用戶==>只需關閉用戶就可以修復它。我只是返回null,然後篩選代碼示例中顯示的消費者記錄輪詢的結果。偏移量自動提交。 –
我正在討論如何修復以前的消費者應用程序。我雖然說過以前的消費者因爲無法反序列化意外消息而死亡。在這種情況下,您應該修復該應用程序以捕獲SerDes異常(或任何其他異常),並關閉並退出或將偏移量提前1並繼續。 –
消費者不會死亡(因此不會拋出異常),它在投票過程中一直處於懸掛狀態。這與Apache kafka消費者線程安全性有關。我們需要關閉特定主題和組ID的卡夫卡消費者,然後再創建一個新組標識(針對相同的主題和組ID)。這是問題所在。 –