0

我在Windows上使用Confluent 3.0.1平臺。我遵循安裝指南和開發人員指南進行所有安裝並開發我的拓撲。AdminUtils.createTopic API拋出kafka.admin.AdminOperationException

我啓動了Zookeeper,然後是Kafka服務器,並嘗試運行我的拓撲。但在卡夫卡服務器上出現錯誤。即使我手動創建主題並運行拓撲,我也會看到相同的錯誤。

INFO Topic creation {"version":1,"partitions":{"0":[0]}} (kafka.admin.AdminUtils$) 
[2016-09-21 17:20:08,807] INFO [KafkaApi-0] Auto creation of topic Text4 with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis) 
[2016-09-21 17:20:09,436] ERROR [KafkaApi-0] Error when handling request {group_id=my-first-streams-application1} (kafka.server.KafkaApis) 
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1 
    at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117) 
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403) 
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createTopic(KafkaApis.scala:629) 
    at kafka.server.KafkaApis.kafka$server$KafkaApis$$createGroupMetadataTopic(KafkaApis.scala:651) 
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657) 
    at kafka.server.KafkaApis$$anonfun$getOrCreateGroupMetadataTopic$1.apply(KafkaApis.scala:657) 
    at scala.Option.getOrElse(Option.scala:121) 
    at kafka.server.KafkaApis.getOrCreateGroupMetadataTopic(KafkaApis.scala:657) 
    at kafka.server.KafkaApis.handleGroupCoordinatorRequest(KafkaApis.scala:818) 
    at kafka.server.KafkaApis.handle(KafkaApis.scala:86) 
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) 
    at java.lang.Thread.run(Thread.java:745) 

我的拓撲代碼如下:

public class MetricTopology implements InitializingBean { 

    @Autowired 
    @Qualifier("getStreamsConfig") 
    private Properties properties; 

    // Method to build topology. 
    public void buildTopology() { 
     System.out.println("MetricTopology.buildTopology()"); 
     TopologyBuilder builder = new TopologyBuilder(); 
     // add the source processor node that takes Kafka topic "Text4" as input 
     builder.addSource("Source", "Text4") 
      // add the Metricsprocessor node which takes the source processor as its upstream processor 
      .addProcessor("Process",() -> new MetricsProcessor(), "Source"); 
     // Building Stream. 
     KafkaStreams streams = new KafkaStreams(builder, properties); 
     streams.start(); 
    } 

    // Called after all properties are set. 
    public void afterPropertiesSet() throws Exception { 
     buildTopology(); 
    } 

} 

下面是我現在用的是不同的Java源文件的部分屬性。

Properties settings = new Properties(); 
// Set a few key parameters. This properties will be picked from property file. 
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application1"); 
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); 
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
settings.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "1"); 
+0

我在運行應用程序端[StreamThread-1] org.apache.kafka.clients.NetworkClient時遇到了以下錯誤:使用關聯ID 30獲取元數據時出現錯誤:{Text4 = LEADER_NOT_AVAILABLE} – Renukaradhya

回答

1
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 1 

爲了創建與複製因子3的話題,你至少需要3級運行的代理。

相關問題