2015-05-04 171 views
0

我是kafka的新手,試圖運行一個示例apache java生產者代碼來將數據推送到kafka。我可以通過java創建新的主題,但在推送時,我收到一個異常。下面是代碼:無法將消息推送到apache kafka?

package kafkaTest; 

import java.text.SimpleDateFormat; 
import java.util.Date; 
import java.util.List; 
import java.util.Properties; 

import org.I0Itec.zkclient.ZkClient; 
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; 

import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

public class HelloKafkaProducer { 
final static String TOPIC = "test_kafka1"; 

public static void main(String[] argv){ 
    Properties properties = new Properties(); 
    properties.put("metadata.broker.list", "172.25.37.66:9092"); 
    ZkClient zkClient = new ZkClient("172.25.37.66:2181", 4000, 6000, new  BytesPushThroughSerializer()); 
    List<String> brokerList = zkClient.getChildren("/brokers/topics"); 
    for(int i=0;i<brokerList.size();i++){ 
     System.out.println(brokerList.get(i)); 
    } 

    properties.put("zk.connect","172.25.37.66:2181"); 

    properties.put("serializer.class","kafka.serializer.StringEncoder"); 
    ProducerConfig producerConfig = new ProducerConfig(properties); 

    kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig); 
    SimpleDateFormat sdf = new SimpleDateFormat(); 
    KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,"Test message from java program " + sdf.format(new Date())); 
    System.out.println(message); 
    producer.send(message); 
    /*Consumer consumerThread = new Consumer(TOPIC); 
    consumerThread.start();*/ 
} 
} 

這是堆棧跟蹤:

topic1 
test_kafka1 
topic11 
test 
test_kafka 
KeyedMessage(test_kafka1,null,null,Test message from java program 4/5/15 1:30 PM) 
Exception in thread "main" [2015-05-04 13:30:41,432] ERROR Failed to send requests for topics test_kafka1 with correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler:97) 
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. 
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) 
at kafka.producer.Producer.send(Producer.scala:77) 
at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
at kafkaTest.HelloKafkaProducer.main(HelloKafkaProducer.java:54) 

在控制檯上,我看到了[2015-05-04 18:55:29,959] INFO Closing socket connection to /172.17.70.73. (kafka.network.Processor)每次我運行程序。我能夠使用控制檯來推拉主題。

所有幫助將不勝感激。 謝謝。

回答

2

在Kafka Producer的情況下,您不會連接到動物園管理員。您必須連接到經紀商。爲此,請使用以下屬性。

props.put("metadata.broker.list", "localhost:9092, broker1:9092"); 

在這裏,我已經在你的情況下使用本地主機將是172.25.37.66