我正在嘗試在java中使用kafka API。我正在使用以下maven依賴項:如何使用Java中的Kafka 8.2 API生成消息?
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.0</version>
</dependency>
我無法連接到遠程kafka服務器。 我將kafka'server.properties'文件端口屬性更改爲端口8080. 我可以啓動zookeeper和kafka服務器都沒有問題。 我也可以使用隨kafka下載的控制檯生產者和消費者應用程序。 (斯卡拉2.10版本)
我使用下面的客戶端代碼來創建遠程KafkaProducer
Properties propsProducer = new Properties();
propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("topic.metadata.refresh.interval.ms", "0");
KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer);
一旦我創建了製片人,我可以運行下面一行,並得到有效的主題信息返回,授予strTopic是現有的主題名稱。
List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic);
當我嘗試發送一個消息,我請執行下列操作:
ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes());
RecordMetadata futureData = m_kafkaProducer.send(prMessage).get();
調用send()塊無限期,當我手動終止過程中,我看到錯誤關閉套接字由於kafka服務器上的錯誤(IOException,連接重置由對等)錯誤。
此外,host.name,advertised.host.name和advertised.port屬性在'server.properties'文件中仍然被註釋掉,這是值得的。哦,如果我改變該行:
propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
到
propsProducer.put("bootstrap.servers", "127.0.0.1:8080");
並且安裝了卡夫卡服務器在同一臺服務器上運行它,它的作品,但我試圖與它合作遠程。
感謝任何幫助,如果我能澄清一切讓我知道。
你是否字面上使用'172.xx.xx.xxx'作爲主機IP地址? – 2015-03-30 18:48:52
不,這是一個完整的IP,X的只是口罩。 – 2015-03-30 18:53:00
Kk。也許防火牆問題?您可以使用netcat驗證端口8080上的網絡連接嗎? – 2015-03-30 19:56:50