1
我期待消耗測試只讀取一條消息並關閉。但是,即使在我打電話給consumer.shutdown()
之後也不是。想知道爲什麼?爲什麼卡夫卡消費者不關機?
測試
public class AppTest
{
App app=new App();
@org.junit.Test
public void publish()
{
assertTrue(app.publish("temptopic","message"));
}
@org.junit.Test
public void consume()
{
app.publish("temptopic","message");
assertEquals("message",app.consume("temptopic","tempgroup"));
}
}
類在測試
public class App
{
public boolean publish(String topicName, String message) {
Properties p=new Properties();
p.put("bootstrap.servers","localhost:9092");
p.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
p.put("request.required.acks","1");
KafkaProducer producer = new KafkaProducer<String,String>(p);
ProducerRecord record=new ProducerRecord(topicName,"mykey",message);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata=null;
try {
recordMetadata = future.get();
} catch (Exception e) {
return false;
}
return true;
}
public String consume(String topicName, String groupId)
{
Properties p=new Properties();
p.put("zookeeper.connect","localhost:2181");
p.put("group.id","groupId");
p.put("zookeeper.session.timeout.ms","500");
p.put("zookeeper.sync.time.ms","250");
p.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumer=null;
String result = "";
try
{
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(p));
Map<String, Integer> topicPartitionCount = new HashMap<String, Integer>();
topicPartitionCount.put(topicName, Integer.valueOf(1));//use 1 thread to read
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicPartitionCount);
List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName);
for (KafkaStream stream : streams) {
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata msg = it.next();
String strMsg = new String((byte[]) msg.message());
System.out.println("received: " + strMsg);
result += strMsg;
}
}
}
finally
{
if (consumer != null)
{
consumer.shutdown();
}
}
return result;
}
}
消耗所有可用消息並關閉的模式是什麼?對於例如消費者需要處理100條消息,然後終止。 – 2015-04-05 12:46:50
一般而言,消費者的意圖是「永遠運行」,消息流往往被認爲是無限的。如果你想在100條消息後停下來,你可以做一些事情: while(i <100 && it.hasNext()){ ... i ++; } – Lundahl 2015-04-05 13:42:05
要在某個時間點消耗主題中的任何內容,然後終止,那麼您需要使用consumer.timeout.ms屬性,因爲消費者無法知道生產者是否同時發佈新消息,或者有限流的最終消息數量是多少。 – Lundahl 2015-04-05 13:51:19