我對卡夫卡完全陌生,我有一些使用KafkaProducer的麻煩。 生產者的發送方法阻塞1分鐘,然後應用程序繼續進行,沒有例外。這顯然是一些超時,但沒有引發異常。KafkaProducer not sending記錄
我在日誌中也看不到任何東西。
服務器接縫要正確設置。如果我使用bin/kafka-console-consumer和producer應用程序,我可以正確地發送和接收消息。代碼接縫也可以在一定程度上發揮作用。 如果我想寫一個不存在的話題,我可以在/ tmp/kafka-logs文件夾中看到新條目,並且也可以在KafkaServer的控制檯輸出中看到。 這裏是我的代碼使用方法:
Properties props = ResourceUtils.loadProperties("kafka.properties");
Producer<String, String> producer = new KafkaProducer<>(props);
for (String line : lines)
{
producer.send(new ProducerRecord<>("topic", Id, line));
producer.flush();
}
producer.close();
在kafka.properties文件的屬性:
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=all
retries=0
batch.size=16384
linger.ms=1
buffer.memory=33554432
所以,producer.send塊1分鐘,然後繼續。最後,卡夫卡沒有儲存任何東西,但新的主題已經創建。 謝謝你的幫助!
#1:我沒有看到在這裏的任何日誌消息,所以你怎麼知道它的'發送()'是正在服用一種分鐘? – kdgregory
#2:在send()返回的'Future'上調用'get()'會發生什麼? – kdgregory
然後我看到以下異常:org.apache.kafka.common.errors.TimeoutException:無法在60000毫秒後更新元數據。 我也嘗試過,當我打電話給producder的partitionsFor(「topic」)方法時,我得到這個異常。 – metabolic