2017-06-19 7 views
0

我想單元測試我的卡夫卡消費者。我正在嘗試使用MockConsumer類,該類自帶kafka-client java api。 下面是我的配置代碼kafka MockConsumer拋出異常錯誤java.lang.IllegalStateException:訂閱主題,分區和模式是互斥的

@Bean 
public MockConsumer consumer(){ 

    MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST); 
    consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0))); 

    HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>(); 
    beginningOffsets.put(new TopicPartition("test-topic", 0), 0L); 
    consumer.updateBeginningOffsets(beginningOffsets); 

    consumer.addRecord(new ConsumerRecord<String, String>("test-topic",0, 
      0L, "mykey", "myvalue0")); 
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0, 
      1L, "mykey", "myvalue1")); 
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0, 
      2L, "mykey", "myvalue2")); 
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0, 
      3L, "mykey", "myvalue3")); 
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0, 
      4L, "mykey", "myvalue4")); 
    HashMap<TopicPartition, Long> endOffsets = new HashMap<>(); 
    endOffsets.put(new TopicPartition("test-topic", 0), 4L); 
    consumer.updateEndOffsets(endOffsets); 
    return consumer; 
} 

現在,當我在我的測試情況下,使用此MockConsumer豆像下面

@Autowired 
MockConsumer kafkaConsumer; 

@Autowired 
@InjectMocks 
MyConsumer myConsumer; //this is the class having consumer code. This 
         //is the class under test 

@Test 
public void testConsumeWithAutoAssignment() throws Exception { 
    myConsumer.consumeTopic("test-topic"); 
} 

我從

kafkaConsumer.subscribe(topic)

java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive

變得異常

請讓我知道是否有人發現問題或解決這個問題。

+0

我重構了我的代碼,只測試processConsumerRecords而不測試''''subscribe'''方法。 –

回答

5

這是因爲在這個bean中你使用的是consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0)));這意味着消費者想要從「測試主題」中的特定分區(0)中消費。然後在某個地方,但是我看不到從您提供的代碼到subscribe(topic)的電話號碼。通過訂閱,消費者成爲消費者羣體的一部分,卡夫卡經紀人自動分配分區(用於重新平衡)。您不能同時使用:分配特定分區(USER DEFINED)和訂閱自動分配。

+0

有沒有什麼方法可以單元測試消費者。我正在使用'''MockConsumer'''。而對於單元測試,我需要將分區分配給消費者,以便在測試期間消耗它。有沒有其他方法可以做到這一點?我知道,因爲訂閱類型變成USER DEFINED,它不可訂閱。 –

+0

我想這是你的救援:https://stackoverflow.com/a/42207232/731894基本上,使用'''rebalance'''不'''assign'''設置分區並設置分區,更新偏移量並添加記錄在''''subscribe'''調用之後 – CrazyGreenHand