2017-10-18 339 views
0

我正在使用Storm版本1.1.0和卡夫卡版本0.10.1.2。卡夫卡噴口錯誤「消費者沒有訂閱任何主題或分配任何分區」

我創造卡夫卡壺嘴如下:

public KafkaSpout<String, String> getKafkaSpout() { 
    String _kafkaBrokers = (String) props.get("bootstrap.servers"); 
    String _topic = (String) props.get("kafka.topic.name"); 
    String groupId = (String) props.get("group.id"); 
    int maxMsgSize = (int) props.get("fetch.message.max.bytes"); 
    String keySerializer = (String) props.get("key.serializer"); 
    String valueSerializer = (String) props.get("value.serializer"); 

    List<String>topics = new ArrayList<String>(`enter code here`); 
    topics.add(_topic); 

    return new KafkaSpout<String, String (KafkaSpoutConfig.builder(_kafkaBrokers, topics) 
      .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) 
      .setMaxUncommittedOffsets(100) 
      .setProp(ConsumerConfig.GROUP_ID_CONFIG, groupId) 
      .setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,maxMsgSize) 
      .setProp("key.serializer",keySerializer) 
      .setProp("value.serializer",valueSerializer) 
      .build()) 
} 

我與我的項目已經下文提到的Maven依賴其他依賴相處下文提到的錯誤

java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions 
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:973) 
at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:291) 
at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:225) 
at org.apache.storm.daemon.executor$fn__9798$fn__9813$fn__9844.invoke(executor.clj:647) 
at org.apache.storm.util$async_loop$fn__555.invoke(util.clj:484) 
at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) 

<dependency> 
    <groupId>org.apache.storm</groupId> 
    <artifactId>storm-kafka-client</artifactId> 
    <version>1.1.0.2.6.2.0-205</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.storm</groupId> 
    <artifactId>storm-kafka</artifactId> 
    <version>1.1.0.2.6.2.0-205</version> 
</dependency> 

回答

0

我會假設List<String>topics = new ArrayList<String>("enter code here");是你的問題?您可能需要在該列表中輸入您的主題名稱。

您的依賴版本很奇怪,AFAIK Storm還沒有發佈任何與這些版本字符串。

我還想知道爲什麼你需要兩個卡夫卡風暴卡夫卡客戶端,這是卡夫卡> 0.10羣集和風暴卡夫卡,這是爲老卡夫卡羣集(但仍然兼容最新的卡夫卡在此刻我認爲)。

相關問題