2017-09-07 145 views
0

從有用的信息 Cribbing,我試圖得到一個單元測試運行,演示消息分佈在同一組的多個成員偵聽同一主題的消息分佈。我剛開始使用卡夫卡,所以我可能會錯過一些根本性的東西。春季Kafka多人組單元測試

我在setup()中沒有得到完整的分區分配。如果我將這三位聽衆/同一組的成員評論出來,並將測試改爲看起來像引用的問題(不同的組/廣播場景),那麼一切都可行。任何指針將不勝感激。 謝謝!

我的設置:

測試類:

@RunWith(SpringRunner.class) 
@SpringBootTest 
@DirtiesContext 
public class SpringKafkaApplicationTest { 

    private static final Logger LOGGER = LoggerFactory.getLogger(SpringKafkaApplicationTest.class); 

    private final static String BAR_TOPIC = "bar.t"; 
    private final static String FOO_TOPIC = "foo.t"; 

    @Autowired 
    private Sender sender; 

    @Autowired 
    private Receiver receiver; 
    // @Autowired 
    // private Receiver receiver2_G1; 
    // @Autowired 
    // private Receiver2 receiver1_G2; 

    @Autowired 
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; 

    // create 3 partitions per topic - to support up to 3 consumers in a group 
    @ClassRule 
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 3, BAR_TOPIC, FOO_TOPIC); 

    @Before 
    public void setUp() throws Exception { 
     // wait until the partitions are assigned 
     for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry 
       .getListenerContainers()) { 
      LOGGER.info("calling waitForAssignment on messageListenerContainer.getClass(): " + messageListenerContainer.getClass()); 
      LOGGER.info("embeddedKafka.getPartitionsPerTopic(): " + embeddedKafka.getPartitionsPerTopic()); 

      ContainerTestUtils.waitForAssignment(messageListenerContainer, 
        embeddedKafka.getPartitionsPerTopic()); 
     } 
    } 

    @Test 
    public void testReceive() throws Exception { 
     for (int i = 0; i < 10; i++) { 
      sender.send(BAR_TOPIC, "testkey_" + i, new Bar("bar")); 
      sender.send(FOO_TOPIC, "testkey_" + i, new Foo("foo")); 
     } 
... 

接收器配置類:

@Configuration 
@EnableKafka 
public class ReceiverConfig { 

    @Value("${kafka.bootstrap-servers}") 
    private String bootstrapServers; 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); // not really used since groupId annotation overrides 

     return props; 
    } 

    @Bean 
    public ConsumerFactory<String, String> consumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), 
       new StringDeserializer()); 
    } 

    @Bean 
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(consumerFactory()); 
     factory.setMessageConverter(new StringJsonMessageConverter()); 

     return factory; 
    } 

    @Bean 
    public Receiver receiver() { 
     return new Receiver(); 
    } 
} 

接收器類聽衆:

public class Receiver { 

    // test 3 listeners in same group - each should get 1/3 of transmitted objects 
    @KafkaListener(id = "barListener_sharedGroup1", topics = "${kafka.topic.bar}", groupId = "bar_sharedGroup") 
    public void receiveBarSharedGroup1(@Payload Bar bar, 
      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, 
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { 

     LOGGER.info("receiveBarSharedGroup1(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic); 
     multigroup1latch.countDown(); 
    } 

    @KafkaListener(id = "barListener_sharedGroup2", topics = "${kafka.topic.bar}", groupId = "bar_sharedGroup") 
    public void receiveBarSharedGroup2(@Payload Bar bar, 
      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, 
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { 

     LOGGER.info("receiveBarSharedGroup2(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic); 
     multigroup2latch.countDown(); 
    } 

    @KafkaListener(id = "barListener_sharedGroup3", topics = "${kafka.topic.bar}", groupId = "bar_sharedGroup") 
    public void receiveBarSharedGroup3(@Payload Bar bar, 
      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, 
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { 

     LOGGER.info("receiveBarSharedGroup3(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic); 
     multigroup3latch.countDown(); 
    } 

更換這些聽衆無線個不同的組,這個工作正常:

//works - bar_group1 listens on topic 'bar' and should receive all that are sent 
@KafkaListener(id = "barListener_group1", topics = "${kafka.topic.bar}", groupId = "bar_group1") 
public void receiveBarGroup1(@Payload Bar bar, 
     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, 
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { 

    LOGGER.info("receiveBarGroup1(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic); 
    eventProcessor.insertLocationData(bar.toString()); 
    group1latch.countDown(); 
} 

// same as above for bar_group2 
@KafkaListener(id = "barListener_group2", topics = "${kafka.topic.bar}", groupId = "bar_group2") 
public void receiveBarGroup2(@Payload Bar bar, 
     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, 
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { 

    LOGGER.info("receiveBarGroup2(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic); 
    eventProcessor.processNotificationMessage(bar.toString()); 
    group2latch.countDown(); 
} 

// this is using default group - again a different group, so should receive all transmitted 'bar' objs 
@KafkaListener(id = "barListener_groupD", topics = "${kafka.topic.bar}") 
public void receiveBar(@Payload Bar bar, 
     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, 
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { 

    LOGGER.info("receiveBar(): received {} with key {}, from partition {}, on topic {}", bar, key, partition, topic); 
    latch.countDown(); 
} 

// also using default group - listening for 'foo' objects, should receive all transmitted 'foo' objs 
@KafkaListener(id = "fooListener_groupD", topics = "${kafka.topic.foo}") 
public void receiveFoo(@Payload Foo foo, 
     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, 
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, 
     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { 

    LOGGER.info("receiveFoo(): received {} with key {}, from partition {}, on topic {}", foo.toString(), key, partition, topic); 
    latch.countDown(); 
} 

調試/應用程序日誌:

14:11:18.940 [main] INFO c.c.kafka.SpringKafkaApplicationTest - Starting SpringKafkaApplicationTest on WB-ATingley-i5 with PID 3140 (started by atingley in C:\dev\tools\kafka\spring-kafka-master\apt-spring-kafka-multiple-groups) 
14:11:18.941 [main] INFO c.c.kafka.SpringKafkaApplicationTest - No active profile set, falling back to default profiles: default 
14:11:21.089 [main] INFO c.c.kafka.SpringKafkaApplicationTest - Started SpringKafkaApplicationTest in 2.477 seconds (JVM running for 4.09) 
14:11:21.132 [main] INFO c.c.kafka.SpringKafkaApplicationTest - calling waitForAssignment on messageListenerContainer.getClass(): class org.springframework.kafka.listener.ConcurrentMessageListenerContainer 
14:11:21.132 [main] INFO c.c.kafka.SpringKafkaApplicationTest - embeddedKafka.getPartitionsPerTopic(): 3 
14:11:21.353 [barListener_sharedGroup3-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[] 
14:11:21.353 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
14:11:21.403 [barListener_sharedGroup2-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[] 
14:11:21.404 [barListener_sharedGroup2-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
14:11:21.411 [barListener_sharedGroup1-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[] 
14:11:21.411 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
14:11:21.434 [barListener_sharedGroup3-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[bar.t-0, bar.t-2, bar.t-1] 
14:11:22.076 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
14:11:23.077 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
14:11:24.077 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
14:11:24.438 [barListener_sharedGroup3-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions revoked:[bar.t-0, bar.t-2, bar.t-1] 
14:11:24.438 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
14:11:24.445 [barListener_sharedGroup2-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[bar.t-0] 
14:11:24.446 [barListener_sharedGroup2-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
14:11:24.446 [barListener_sharedGroup3-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[bar.t-1] 
14:11:24.446 [barListener_sharedGroup1-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer - partitions assigned:[bar.t-2] 

<<< at this point, partition assignment looks as I would expect it - one per listener - but we are waiting on an assignment of three per listener? >>> 

14:11:24.446 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
14:11:25.077 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
14:11:25.446 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
... 
14:12:20.447 [barListener_sharedGroup2-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
14:12:20.449 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
14:12:21.079 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records 
14:12:21.199 [barListener_sharedGroup2-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
14:12:21.199 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
14:12:21.199 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {} 
14:12:21.203 [barListener_sharedGroup1-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped 
14:12:21.204 [barListener_sharedGroup1-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer - KafkaMessageListenerContainer [id=barListener_sharedGroup1-0, clientIndex=-0, topicPartitions=[bar.t-2]] stopped normally 
14:12:21.208 [barListener_sharedGroup3-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped 
14:12:21.208 [barListener_sharedGroup3-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer - KafkaMessageListenerContainer [id=barListener_sharedGroup3-0, clientIndex=-0, topicPartitions=[bar.t-1]] stopped normally 
14:12:21.212 [barListener_sharedGroup2-0-C-1] INFO o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped 
14:12:21.212 [barListener_sharedGroup2-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer - KafkaMessageListenerContainer [id=barListener_sharedGroup2-0, clientIndex=-0, topicPartitions=[bar.t-0]] stopped normally 

謝謝!

哦 - 單元測試斷言失敗(變暖......!)。 謝謝!

SpringKafkaApplicationTest (3) 
com.codenotfound.kafka.SpringKafkaApplicationTest 
testReceive(com.codenotfound.kafka.SpringKafkaApplicationTest) 
org.junit.ComparisonFailure: expected:<[3]> but was:<[1]> 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) 
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) 
    at org.springframework.kafka.test.utils.ContainerTestUtils.waitForAssignment(ContainerTestUtils.java:74) 
    at com.codenotfound.kafka.SpringKafkaApplicationTest.setUp(SpringKafkaApplicationTest.java:62) 
+0

您可以編輯的問題表明,包括主題/分區的分配,以及測試日誌的日誌? –

回答

0

這裏是我的問題,從不同的單元測試複製/粘貼錯誤 - 儘管我需要3個分區,1元聽衆,每個聽衆容器只被分配一個分區(如預期):

ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    embeddedKafka.getPartitionsPerTopic()); 

...應該是:

ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);