2015-04-04 45 views
1

我期待消耗測試只讀取一條消息並關閉。但是,即使在我打電話給consumer.shutdown()之後也不是。想知道爲什麼?爲什麼卡夫卡消費者不關機?

測試

public class AppTest 
{ 
    App app=new App(); 

    @org.junit.Test 
    public void publish() 
    { 

     assertTrue(app.publish("temptopic","message")); 
    } 

    @org.junit.Test 
    public void consume() 
    { 
     app.publish("temptopic","message"); 
     assertEquals("message",app.consume("temptopic","tempgroup")); 
    } 
} 

類在測試

public class App 
{ 
    public boolean publish(String topicName, String message) { 

     Properties p=new Properties(); 
     p.put("bootstrap.servers","localhost:9092"); 
     p.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
     p.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
     p.put("request.required.acks","1"); 

     KafkaProducer producer = new KafkaProducer<String,String>(p); 

     ProducerRecord record=new ProducerRecord(topicName,"mykey",message); 

     Future<RecordMetadata> future = producer.send(record); 
     RecordMetadata recordMetadata=null; 
     try { 
      recordMetadata = future.get(); 
     } catch (Exception e) { 
      return false; 
     } 

     return true; 
    } 

    public String consume(String topicName, String groupId) 
    { 
     Properties p=new Properties(); 
     p.put("zookeeper.connect","localhost:2181"); 
     p.put("group.id","groupId"); 
     p.put("zookeeper.session.timeout.ms","500"); 
     p.put("zookeeper.sync.time.ms","250"); 
     p.put("auto.commit.interval.ms", "1000"); 

     ConsumerConnector consumer=null; 
     String result = ""; 

     try 
     { 
      consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(p)); 

      Map<String, Integer> topicPartitionCount = new HashMap<String, Integer>(); 
      topicPartitionCount.put(topicName, Integer.valueOf(1));//use 1 thread to read 

      Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicPartitionCount); 

      List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName); 


      for (KafkaStream stream : streams) { 
       ConsumerIterator it = stream.iterator(); 
       while (it.hasNext()) { 
        MessageAndMetadata msg = it.next(); 
        String strMsg = new String((byte[]) msg.message()); 
        System.out.println("received: " + strMsg); 
        result += strMsg; 
       } 
      } 
     } 
     finally 
     { 
      if (consumer != null) 
      { 
       consumer.shutdown(); 
      } 
     } 
     return result; 
    } 
} 

回答

4

的ConsumerIterator將默認塊無限期只是等待下一條消息。要從while循環中取出它,你需要中斷這個線程。

有一個消費者屬性「consumer.timeout.ms」,它可以設置爲一個值,然後超時異常將被拋給消費者,但這可能不是單元測試最實用的解決方案。

+0

消耗所有可用消息並關閉的模式是什麼?對於例如消費者需要處理100條消息,然後終止。 – 2015-04-05 12:46:50

+0

一般而言,消費者的意圖是「永遠運行」,消息流往往被認爲是無限的。如果你想在100條消息後停下來,你可以做一些事情: while(i <100 && it.hasNext()){ ... i ++; } – Lundahl 2015-04-05 13:42:05

+0

要在某個時間點消耗主題中的任何內容,然後終止,那麼您需要使用consumer.timeout.ms屬性,因爲消費者無法知道生產者是否同時發佈新消息,或者有限流的最終消息數量是多少。 – Lundahl 2015-04-05 13:51:19