2016-07-26 66 views
0

我試圖從代碼中將分區數設置爲2,並且我有單節點設置(1 zookeeper,1kafka)。當我使用消息時,我看到kafka只使用一個分區來存儲數據,我是否需要對安裝程序進行任何修改以使其具有多個分區?在Apache Kafka中設置多個分區

private void setupZookeeper(String[] topicList){ 

    ZkClient zkClient = null; 
    ZkUtils zkUtils = null; 
    try { 
     String[] zookeeperHosts = {"localhost:2181"}; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181"; 
     int sessionTimeOutInMs = 15 * 1000; // 15 secs 
     int connectionTimeOutInMs = 10 * 1000; // 10 secs 
     //String topicName = "testTopic"; 
     int noOfPartitions = 2; 
     int noOfReplication = 1; 

     for(String zookeeper:zookeeperHosts){ 

      zkClient = new ZkClient(zookeeper, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); 
      zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeper), false); 
      for(String topicName: topicList){ 
       System.out.println("Setting no of partitions ="+noOfPartitions + "for topic" + topicName); 
       AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, 
         producerConfig(),RackAwareMode.Disabled$.MODULE$); 
      } 
     } 



    } catch (Exception ex) { 
     ex.printStackTrace(); 
    } finally { 
     if (zkClient != null) { 
      zkClient.close(); 
     } 
    } 

我producerConfig,如下所示:

private Properties producerConfig() { 
    Properties props = new Properties(); 
    props.put("bootstrap.servers", "localhost:9092"); 

    props.put("acks", "all"); 
    //props.put("retries", 0); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

return props; 
} 

回答

1

當我使用該消息我看到卡夫卡僅使用一個 分區來存儲數據

的默認消息分區策略如下,「只有一個使用的分區」可能是由常量消息密鑰造成的,計算的相同散列值和路由到Ÿ一個分區。

  • 如果在記錄中指定了分區,請使用它;
  • 如果未指定分區但存在密鑰,請根據密鑰的散列值選擇分區;
  • 如果沒有分區或密鑰存在,請以循環方式選擇一個分區。

you