2016-11-11 24 views
1

我對卡夫卡完全陌生,我有一些使用KafkaProducer的麻煩。 生產者的發送方法阻塞1分鐘,然後應用程序繼續進行,沒有例外。這顯然是一些超時,但沒有引發異常。KafkaProducer not sending記錄

我在日誌中也看不到任何東西。

服務器接縫要正確設置。如果我使用bin/kafka-console-consumer和producer應用程序,我可以正確地發送和接收消息。代碼接縫也可以在一定程度上發揮作用。 如果我想寫一個不存在的話題,我可以在/ tmp/kafka-logs文件夾中看到新條目,並且也可以在KafkaServer的控制檯輸出中看到。 這裏是我的代碼使用方法:

Properties props = ResourceUtils.loadProperties("kafka.properties"); 
    Producer<String, String> producer = new KafkaProducer<>(props); 

    for (String line : lines) 
    { 
     producer.send(new ProducerRecord<>("topic", Id, line)); 
     producer.flush(); 
    } 
    producer.close(); 

在kafka.properties文件的屬性:

bootstrap.servers=localhost:9092 
key.serializer=org.apache.kafka.common.serialization.StringSerializer 
value.serializer=org.apache.kafka.common.serialization.StringSerializer 
acks=all 
retries=0 
batch.size=16384 
linger.ms=1 
buffer.memory=33554432 

所以,producer.send塊1分鐘,然後繼續。最後,卡夫卡沒有儲存任何東西,但新的主題已經創建。 謝謝你的幫助!

+0

#1:我沒有看到在這裏的任何日誌消息,所以你怎麼知道它的'發送()'是正在服用一種分鐘? – kdgregory

+0

#2:在send()返回的'Future'上調用'get()'會發生什麼? – kdgregory

+0

然後我看到以下異常:org.apache.kafka.common.errors.TimeoutException:無法在60000毫秒後更新元數據。 我也嘗試過,當我打電話給producder的partitionsFor(「topic」)方法時,我得到這個異常。 – metabolic

回答

0

嘗試設置bootstrap.servers127.0.0.1:9092

+0

解決了我的問題 –

+0

如果您有新的問題,請點擊[Ask Question](問問題)(http://stackoverflow.com/questions/ask)按鈕。如果有助於提供上下文,請包含此問題的鏈接。 - [來自評論](/ review/low-quality-posts/15335062) – Gandhi