2016-11-12 29 views
2
val props = new Properties() 
props.put("bootstrap.servers", "foo:9092,bar:9092") 
props.put("acks", "all") 
props.put("retries", 1) 
props.put("batch.size", 10000) 
props.put("linger.ms", 5) 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 

try { 
    producer.send(new ProducerRecord[String, String](topic, key, msg.toJson)).get() 
    true 
} catch { 
    case ex: Throwable => { 
     println(ex) 
     false 
    } 
} 

此代碼拋出一個異常失敗後48毫秒

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 48 ms.\n\tat 
org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)\n\tat 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)\n\tat 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)\ 

注意,我張貼對生產集羣數據,因此它的啓動和運行,很多應用已經成功地發佈消息到主題更新元數據。它只是我的代碼沒有發佈。

+0

我注意到max.block.ms被設置爲48 ms,這是一個非常低的值。爲什麼你設置這樣的值來檢索元數據? – amethystic

+0

@amethystic這不是在給定的配置中,你從哪裏得到的? – annedroiid

+0

從「48ms後無法更新元數據」。 – amethystic

回答

1

看起來主機名或端口有問題,因爲您無法連接生產者和TimeoutException。你有沒有嘗試使用相同的生產者配置運行kafka-console-producer.sh?它真的在那臺機器上工作嗎?也許,與代理的連接受SSL或SASL保護。不要忘記打開config/tools-log4j.properties中的TRACE日誌記錄,這將幫助您調試您的問題。

0

我之前收到了相同的超時異常,當我試圖使用控制檯生產者發送消息時,我得到了WARN Error while fetching metadata with correlation id 656 : {metadata-1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

對於我自己來說,停止並重新啓動經紀人讓所有的事情都能夠重新開始。