我正在使用Kafka 0.9.0.1。Kafka:從主題消耗第一條消息時間歇性緩慢
我第一次開始了我的應用程序需要20-30秒內從
我用不同的卡夫卡經紀人(具有不同CONFIGS)專題檢索「最新」消息,但我仍然看到此行爲。隨後的消息通常沒有緩慢。
這是預期的行爲?你可以清楚地看到以下運行此示例應用程序,改變經紀/主題名稱你自己的設置
public class KafkaProducerConsumerTest {
public static final String KAFKA_BROKERS = "...";
public static final String TOPIC = "...";
public static void main(String[] args) throws ExecutionException, InterruptedException {
new KafkaProducerConsumerTest().run();
}
public void run() throws ExecutionException, InterruptedException {
Properties consumerProperties = new Properties();
consumerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS);
consumerProperties.setProperty("group.id", "Test");
consumerProperties.setProperty("auto.offset.reset", "latest");
consumerProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
MyKafkaConsumer kafkaConsumer = new MyKafkaConsumer(consumerProperties, TOPIC);
Executors.newFixedThreadPool(1).submit(() -> kafkaConsumer.consume());
Properties producerProperties = new Properties();
producerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS);
producerProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
MyKafkaProducer kafkaProducer = new MyKafkaProducer(producerProperties, TOPIC);
kafkaProducer.publish("Test Message");
}
}
class MyKafkaConsumer {
private final Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class);
private KafkaConsumer<String, Object> kafkaConsumer;
public MyKafkaConsumer(Properties properties, String topic) {
kafkaConsumer = new KafkaConsumer<String, Object>(properties);
kafkaConsumer.subscribe(Lists.newArrayList(topic));
}
public void consume() {
while (true) {
logger.info("Started listening...");
ConsumerRecords<String, Object> consumerRecords = kafkaConsumer.poll(Long.MAX_VALUE);
logger.info("Received records {}", consumerRecords.iterator().next().value());
}
}
}
class MyKafkaProducer {
private KafkaProducer<String, Object> kafkaProducer;
private String topic;
public MyKafkaProducer(Properties properties, String topic) {
this.kafkaProducer = new KafkaProducer<String, Object>(properties);
this.topic = topic;
}
public void publish(Object object) throws ExecutionException, InterruptedException {
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, "key", object);
Future<RecordMetadata> response = kafkaProducer.send(producerRecord);
response.get();
}
}
感謝您的嘗試,我斷斷續續地看到了這種瞬間行爲,但如果您沒有請嘗試幾次,您應該看到延遲。另外,我很欣賞你的理論,但是我在「開始聆聽」之後還發布了一秒鐘的消息,而且還需要20秒左右 – DJ180