我有兩個卡夫卡經紀人:服務器1:9092和server2:9092 我使用的是Java客戶端發送消息到這個集羣,這是代碼:的ConnectException當集羣中的一個卡夫卡券商下來
@Test
public void sendRecordToTopic() throws InterruptedException, ExecutionException {
//See at http://kafka.apache.org/documentation.html#newproducerconfigs
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"server1:9092,server2:9092");
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> myRecord =
new ProducerRecord<String, String>("my-replicated-topic", "test", "someValue");
boolean syncSend = true;
if (syncSend) {
//Synchronously send
producer.send(myRecord).get();
} else {
//Asynchronously send
producer.send(myRecord);
}
producer.close();
}
當經紀商之一是倒在一些情況下,測試引發此異常(在此異常例如「server1的」是向下):
2015年11月2日17:59:29138 WARN [有機.apache.kafka.common.network.Selector] I/O錯誤, server1/40.35.250.2 27 java.net.ConnectException:連接被拒絕: 處 sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) sun.nio.ch.SocketChannelImpl.checkConnect(本機方法)沒有進一步的信息在組織.apache.kafka.common.network.Selector.poll(Selector.java:238) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients .producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run( Thread.java:745)