我使用0.8.2.1 kafka-clients庫連接到Kafka。當卡夫卡上線時,我能夠成功連接,但當卡夫卡倒閉時,我想優雅地處理失敗。下面是我的配置:我如何優雅地處理卡夫卡中斷?
kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
kafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
kafkaProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
producer = new KafkaProducer(kafkaProperties);
當卡夫卡下來,我得到了我的日誌以下錯誤:
WARN: 07 Apr 2015 14:09:49.230 org.apache.kafka.common.network.Selector:276 - [] Error in I/O with localhost/127.0.0.1
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_75]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) ~[na:1.7.0_75]
at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
在無限循環中重複這個錯誤,並鎖定了我的Java應用程序。我嘗試了與超時,重試和確認有關的各種配置設置,但是我一直無法防止發生這種循環。
有沒有可以防止這種情況的配置設置?我需要嘗試不同版本的客戶端嗎?如何優雅地處理Kafka中斷?
非常有幫助,謝謝。這是正式記錄在任何地方? – maxenglander
@maxenglander:不是我看到的。我不得不通過代碼去探索,並且有很多嘗試和錯誤。 –
「而不是完全異步」 – vbence