2017-07-27 179 views
0

我試圖從卡夫卡消費多個主題使用相同的@KafkaListener實現,但我想每個主題有一個消費者(每個只有一個分區)。爲了實現這一點,我創建了@KafkaListener,其中topicPattern="topic1|topic2|topic3"和ConcurrentKafkaListenerContainerFactory具有併發3和groupId consumer_group。問題是,所有的主題都分配到相同的消費者和其他消費者2變爲空閒如下面的日誌顯示:春季卡夫卡 - 所有主題到同一個消費者

21:49:13.140 [org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [] for group consumer_group 21:49:13.140 [org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [] for group consumer_group 21:49:13.141 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [topic1, topic2, topic3] for group consumer_group

我如何暗示春季卡夫卡到每個主題傳播到不同的消費者?

Cheers

回答

1

對每個消費者使用不同的GroupID。由於你所有的三個消費者都在同一個小組中,他們將把這些分區從一個主題中劃分出來。由於每個主題有1個分區,只有一個消費者可以分配它。

此外,不要在訂閱調用中包含每個主題,只是您在每個客戶中需要的主題。

在KAFKAs網站上閱讀更多信息。他們很清楚地說明了這一點

替代方案:保持相同的組ID,但讓每個消費者subsribe就是了只

0

併發的消息偵聽器將創建3名消費者與同組ID的話題;這是通過設計。做你想做的,你需要3x @KafkaListener,每個主題一個;他們當然可以委託給相同的方法。

@KafkaListener(topic = ...) 
public void l1(...) { 
    doListen(...); 
} 

@KafkaListener(topic = ...) 
public void l2(...) { 
    doListen(...); 
} 

@KafkaListener(topic = ...) 
public void l3(...) { 
    doListen(...); 
} 

private void doListen(...) { 
    ... 
} 
相關問題