我已經使用spring-kafka lib實施了Kafka消費者。 我有一個2分區的卡夫卡主題,我也使用ConcurrentKafkaListenerContainerFactory
併發級別設置爲2,因爲每個容器實例應根據spring-kafka documentation從單個分區消耗。Spring-kafka聽衆協會
KafkaMessageListenerContainer接收來自單個線程上所有 主題/分區的所有消息。 ConcurrentMessageListenerContainer代表1個或多個 KafkaMessageListenerContainer以提供多線程消耗。
還有就是我的消費類:
@Component
public class KafkaConsumer {
private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();
@KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
String message = record.value().toString();
Event event = EventFactory.createEvent(message);
String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
// add event to hashMap
LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
if (queue == null) {
queue = new LinkedBlockingQueue<>();
queue.add(event);
hashMap.put(customerId, queue);
} else {
queue.add(event);
}
}
}
正如你看到的我有「的HashMap」在收集我把我的事件,基於消息的相應的隊列「CUSTOMER_ID」屬性。 這種功能在多線程訪問的情況下需要額外的同步,正如我看到的,spring-kafka只爲所有容器創建一個bean實例,而不是爲每個容器創建單獨的bean實例,以避免併發問題。
如何以編程方式更改此邏輯?
我看到修復這個問題的唯一奇怪的方法是使用兩個JVM運行一個單獨的應用程序與單線程使用者,因此使用#receive方法訪問KafkaConsumer類將是單線程的。
thx解釋 – MeetJoeBlack