2016-07-26 107 views
2

我按照以下說明設置了一個多節點kafka羣集。 現在,如何連接到動物園管理員?在JAVA中連接到生產者/消費者端的一個動物園管理員,還是有辦法連接所有的動物園管理員節點?在Apache Kafka多節點羣集中連接到Zookeeper

設置多節點的Apache ZooKeeper的集羣

在羣集的每個節點添加下列行到文件卡夫卡/配置/ zookeeper.properties

server.1=zNode01:2888:3888 
    server.2=zNode02:2888:3888 
    server.3=zNode03:2888:3888 
    #add here more servers if you want 
    initLimit=5 
    syncLimit=2 

在羣集的每個節點創建一個文件在由dataDir屬性表示的文件夾中調用myid(默認情況下文件夾爲/ tmp/zookeeper)。該身份識別碼文件應該只包含Z序節點的ID(「1」 zNode01,「2」 ZNode02,等...)

設置多代理的Apache卡夫卡集羣

在羣集中的每個節點修改修改從文件卡夫卡/配置/ server.properties屬性zookeeper.connect:

zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181 

在羣集的每個節點從文件卡夫卡/配置/ server.properties修改屬性host.name: host.name = zNode0x

在羣集的每個節點上修改文件kafka/config/server.properties中的屬性broker.id(羣集中的每個代理應具有唯一的ID)

回答

3

您可以將生產者或消費者中的所有節點。卡夫卡有足夠的智能,它會連接到具有您所需的基礎上,複製因子或分區

這裏的數據節點消費者代碼:

Properties props = new Properties(); 
    props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092"); 
    props.put("group.id", "test"); 
    props.put("enable.auto.commit", "true"); 
    props.put("auto.commit.interval.ms", "1000"); 
    props.put("session.timeout.ms", "30000"); 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
    consumer.subscribe(Arrays.asList("foo", "bar")); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(100); 
     for (ConsumerRecord<String, String> record : records) 
      System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); 
    } 

你可以找到更多信息here

注意:這個問題是它會打開多個連接來找出哪個節點存放數據。對於更健壯和可擴展的系統,您可以維護分區號和節點名稱的地圖,這也有助於加載數據。

這裏是生產樣品

Properties props = new Properties(); 
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092"); 
props.put("acks", "all"); 
props.put("retries", 0); 
props.put("batch.size", 16384); 
props.put("linger.ms", 1); 
props.put("buffer.memory", 33554432); 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

Producer<String, String> producer = new KafkaProducer<>(props); 
for(int i = 0; i < 100; i++) 
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); 

producer.close(); 

更多信息here

+0

如何爲一個主題創建多個分區?如何才能做到這一點?我們不需要像這樣通過ZkClient進行授權嗎?這裏討論:http://stackoverflow.com/questions/27036923/how-to-create-a-topic-in-kafka-through-java – amateur

+0

AdminUtils.createTopic(zkUtils,topicName,noOfPartitions,noOfReplication,topicConfiguration); – amateur

+0

您可以使用AdminUtils創建主題..但更好的做法是在節點本身中創建它,並使用命令調用這是一次性任務。命令格式/bin/kafka-topics.sh --zookeeper c6401.ambari.apache.org:2181 --create --topic test_topic --partitions 2 --replication-factor 2創建主題「test_topic」。 – Shettyh

0

無需通過在卡夫卡的客戶動物園管理員連接屬性(監製&消費者)。

從Kafka-v9及以上版本開始,Kafka Producer和Consumer不與Zookeeper進行通信。

+0

我使用V9,即時獲取以下異常,在屬性中需要zookeeper信息.. – amateur

+0

引起:java.lang.IllegalArgumentException:需求失敗:缺少必需的屬性'zookeeper.connect' \t at scala.Predef $ .require(Predef.scala:233) \t at kafka.utils.VerifiableProperties.getString(VerifiableProperties.scala:177) \t at kafka.utils.ZKConfig。 (ZkUtils.scala:740) – amateur

+0

使用'kafka-clients'庫中的KafkaProducer和KafkaConsumer。 –

相關問題