根據Spring Kafka文檔設置卡夫卡ConsumerFactory。 但是,groupId似乎沒有被使用。也許我只是把整件事情弄錯了,所以我想讓你知道我的經歷。卡夫卡消費者未加入自定義組標識
這是我的配置似乎並沒有工作:
@Bean
ConsumerFactory<String, KafkaEvent> kafkaEventConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
getConsumerProperties(),
new StringDeserializer(),
new JsonDeserializer<>(KafkaEvent.class));
}
Map<String, Object> getConsumerProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // TODO
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 70000);
return props;
}
我有配置這樣的@KafkaEventListener
,無需再次明確指定的groupId:
@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC)
public class KafkaEventListener {
@Autowired
private ConsumerFactory<String, KafkaEvent> consumerFactory;
@KafkaHandler
public void listenTo(@Payload KafkaEvent event) {
LOGGER.error(LogMarker.KAFKA, consumerFactory.getConfigurationProperties().toString());
}
}
我也能看到我的groupId「myGroupId」包含在上面記錄的錯誤日誌中。然而,令我懷疑的是一些ConsumerCoordinator的DEBUG日誌記錄,它總是聲明加入一個不同的groupId,我有點擔心這看起來是正確的。
2017-09-04 15:28:13.904 ( ) INFO consumer.internals.AbstractCoordinator - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.904 ( ) INFO consumer.internals.AbstractCoordinator - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.906 ( ) INFO consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [] for group org.springframework.kafka.KafkaListenerEndpointContainer#0
2017-09-04 15:28:13.907 ( ) INFO consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [my-topic-0] for group org.springframework.kafka.KafkaListenerEndpointContainer#0
同樣在Spring啓動時輸出ConsumerConfig。我可以看到groupId是錯誤的,但其他屬性被正確接管。
據我瞭解,我可以通過在ConsumerFactory上設置groupId或使用spring.kafka.consumer.group-id
在application.properties中設置它來設置全局groupId。儘管兩種變體都不起作用。
只有當我配置使用的groupId @KafkaListener
註釋的LOG指出,消費者加入了正確的組:
2017-09-04 15:38:30.787 ( ) DEBUG consumer.internals.AbstractCoordinator - Received successful JoinGroup response for group myGroupId: [email protected]
有了這個配置:
@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC, groupId = "myGroupId")
我們正在使用Spring 2.0.0啓動.M3(因此,Spring Kafka 2.0.0.M3)