2017-09-21 37 views
0

我已經使用spring-kafka lib實施了Kafka消費者。 我有一個2分區的卡夫卡主題,我也使用ConcurrentKafkaListenerContainerFactory併發級別設置爲2,因爲每個容器實例應根據spring-kafka documentation從單個分區消耗。Spring-kafka聽衆協會

KafkaMessageListenerContainer接收來自單個線程上所有 主題/分區的所有消息。 ConcurrentMessageListenerContainer代表1個或多個 KafkaMessageListenerContainer以提供多線程消耗。

還有就是我的消費類:

@Component 
public class KafkaConsumer { 
    private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>(); 

    @KafkaListener(topics = "${kafka.topic}", groupId = "events_group") 
    public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException { 
     String message = record.value().toString(); 
     Event event = EventFactory.createEvent(message); 
     String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID); 
     // add event to hashMap 
     LinkedBlockingQueue<Event> queue = hashMap.get(customerId); 
     if (queue == null) { 
      queue = new LinkedBlockingQueue<>(); 
      queue.add(event); 
      hashMap.put(customerId, queue); 
     } else { 
      queue.add(event); 
     } 
    } 
} 

正如你看到的我有「的HashMap」在收集我把我的事件,基於消息的相應的隊列「CUSTOMER_ID」屬性。 這種功能在多線程訪問的情況下需要額外的同步,正如我看到的,spring-kafka只爲所有容器創建一個bean實例,而不是爲每個容器創建單獨的bean實例,以避免併發問題。

如何以編程方式更改此邏輯?

我看到修復這個問題的唯一奇怪的方法是使用兩個JVM運行一個單獨的應用程序與單線程使用者,因此使用#receive方法訪問KafkaConsumer類將是單線程的。

回答

1

這是正確的。這是如何工作的。框架並不依賴於bean,而僅僅是它的方法來傳遞消息給函數。

您可能會考慮爲您的主題中的每個分區設置兩個@KafkaListener方法。確實,來自一個分區的記錄通過單個線程傳送到@KafkaListener。所以,如果你真的不能忍受這種狀態,你可以爲每個線程使用兩個HashMap

聆聽者抽象背後的一般想法正是大約無狀態行爲。那KafkaConsumer是普通的Spring 單身人士 bean。你必須接受這一事實,並根據這種情況重新設計你的解決方案。

+0

thx解釋 – MeetJoeBlack