2015-03-30 17 views
5

我正在嘗試在java中使用kafka API。我正在使用以下maven依賴項:如何使用Java中的Kafka 8.2 API生成消息?

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>0.8.2.0</version> 
</dependency> 

我無法連接到遠程kafka服務器。 我將kafka'server.properties'文件端口屬性更改爲端口8080. 我可以啓動zookeeper和kafka服務器都沒有問題。 我也可以使用隨kafka下載的控制檯生產者和消費者應用程序。 (斯卡拉2.10版本)

我使用下面的客戶端代碼來創建遠程KafkaProducer

Properties propsProducer = new Properties(); 

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); 
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 
propsProducer.put("topic.metadata.refresh.interval.ms", "0"); 

KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer); 

一旦我創建了製片人,我可以運行下面一行,並得到有效的主題信息返回,授予strTopic是現有的主題名稱。

List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic); 

當我嘗試發送一個消息,我請執行下列操作:

ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes()); 

RecordMetadata futureData = m_kafkaProducer.send(prMessage).get(); 

調用send()塊無限期,當我手動終止過程中,我看到錯誤關閉套接字由於kafka服務器上的錯誤(IOException,連接重置由對等)錯誤。

此外,host.name,advertised.host.name和advertised.port屬性在'server.properties'文件中仍然被註釋掉,這是值得的。哦,如果我改變該行:

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080"); 

propsProducer.put("bootstrap.servers", "127.0.0.1:8080"); 

並且安裝了卡夫卡服務器在同一臺服務器上運行它,它的作品,但我試圖與它合作遠程。

感謝任何幫助,如果我能澄清一切讓我知道。

+1

你是否字面上使用'172.xx.xx.xxx'作爲主機IP地址? – 2015-03-30 18:48:52

+0

不,這是一個完整的IP,X的只是口罩。 – 2015-03-30 18:53:00

+0

Kk。也許防火牆問題?您可以使用netcat驗證端口8080上的網絡連接嗎? – 2015-03-30 19:56:50

回答

3

經過大量的挖掘,我決定實施這裏找到的例子:Kafka Producer Example。我縮短了代碼,沒有實現分區類。我更新了我的pom列出的依賴關係,並且我仍然遇到同樣的問題。最終,我做了一些配置更改,一切正常。

最後一塊難題是在服務器和客戶端機器的/ etc/hosts中定義Kafka服務器。我將以下內容添加到兩個文件中。

172.xx.xx.xxx  serverHost1 

再次,x的只是口罩。然後,我將server.properties文件中的advertised.host.name設置爲serverHost1。注意:在服務器計算機上運行ifconfig後,我獲得了該IP。

我改了行

propsProducer.put("metadata.broker.list", "172.xx.xx.xxx:8080"); 

propsProducer.put("metadata.broker.list", "serverHost1:8080"); 

卡夫卡API不喜歡,我是定義IP作爲一個字符串的事實。相反,它從etc/hosts文件中查找IP,雖然文檔中提到:

「代理將向生產者和消費者通告的主機名。如果未設置,則使用」host.name「的值(如果已配置)否則,它將使用從java.net.InetAddress.getCanonicalHostName()返回的值。「

這將只返回IP,在字符串形式,我以前使用,如果沒有定義在客戶端機器的etc/hosts中,否則它返回與IP(serverHost1在我的情況下)配對的名稱。另外,我從來沒有設置host.name的值。

+0

是bootstrap.servers替換metadata.broker.list? – 2016-08-22 05:55:05

+1

是的,我相信。在0.8.2.0版本中,該字段爲「metadata.broker.list」,但在較新的版本中爲「boostrap.servers」 – 2016-08-22 13:53:33

+0

是的!這是真的。與新的ProducerAPI這是新的配置。 – 2016-08-23 04:24:02

相關問題