0

我是卡夫卡和風暴的新手。我試圖實現一個集成Kafka和風暴的java示例。我在網上找到了一個例子。我正在嘗試在eclipse IDE中運行java程序。我沒有使用maven。Kafka Storm與java集成。 kafka.api.OffsetRequest.DefaultClientId()Ljava /郎/字符串;錯誤

我有storm-kafka-0.10.0.jar,kafka-0.6.jar,scala-library-2.10.3.jarstorm-core-0.10.0.jar作爲外部罐子。

這是我的java代碼。

KafkaStormSample.java

import backtype.storm.Config; 
import backtype.storm.LocalCluster; 
import backtype.storm.topology.TopologyBuilder; 

import java.util.UUID; 

import backtype.storm.spout.SchemeAsMultiScheme; 
import storm.kafka.ZkHosts; 
import storm.kafka.BrokerHosts; 
import storm.kafka.SpoutConfig; 
import storm.kafka.KafkaSpout; 
import storm.kafka.StringScheme; 

public class KafkaStormSample { 
    public static void main(String[] args) throws Exception{ 
     Config config = new Config(); 
     config.setDebug(true); 
     config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); 
     String zkConnString = "localhost:2181"; 
     String topic = "my-first-topic"; 
     BrokerHosts hosts = new ZkHosts(zkConnString); 

     SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic,  
     UUID.randomUUID().toString()); 
     kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4; 
     kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4; 
    //kafkaSpoutConfig.forceFromStart = true; 
     kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig)); 
    //builder.setBolt("word-spitter", new SplitBolt()).shuffleGrouping("kafka-spout"); 
     builder.setBolt("word-counter", new CountBolt()).shuffleGrouping("word-spitter"); 

     LocalCluster cluster = new LocalCluster(); 
     cluster.submitTopology("KafkaStormSample", config, builder.createTopology()); 

     Thread.sleep(10000); 

     cluster.shutdown(); 
    } 
} 

CountBolt.java

import java.util.Map; 
import java.util.HashMap; 

import backtype.storm.tuple.Tuple; 
import backtype.storm.task.OutputCollector; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.IRichBolt; 
import backtype.storm.task.TopologyContext; 

public class CountBolt implements IRichBolt{ 
    Map<String, Integer> counters; 
    private OutputCollector collector; 

    @Override 
    public void prepare(Map stormConf, TopologyContext context, 
    OutputCollector collector) { 
     this.counters = new HashMap<String, Integer>(); 
     this.collector = collector; 
    } 

    @Override 
    public void execute(Tuple input) { 
     String str = input.getString(0); 

     if(!counters.containsKey(str)){ 
     counters.put(str, 1); 
     }else { 
     Integer c = counters.get(str) +1; 
     counters.put(str, c); 
     } 

     collector.ack(input); 
    } 

    @Override 
    public void cleanup() { 
     for(Map.Entry<String, Integer> entry:counters.entrySet()){ 
     System.out.println(entry.getKey()+" : " + entry.getValue()); 
     } 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 

    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; 
    } 
} 

當我嘗試運行kafkaStormSample.java我不斷收到下面的錯誤。

Exception in thread "main" java.lang.NoSuchMethodError: kafka.api.OffsetRequest.DefaultClientId()Ljava/lang/String; 
    at storm.kafka.KafkaConfig.<init>(KafkaConfig.java:43) 
    at storm.kafka.SpoutConfig.<init>(SpoutConfig.java:40) 
    at KafkaStormSample.main(KafkaStormSample.java:23) 

我確定我有所有需要的罐子。但我仍然認爲我缺少jar。

任何幫助,將不勝感激。

謝謝!

回答

0

我對這些系統知之甚少,但它看起來像是一個庫版本不匹配。

其中一個庫(在thids案中的Storm)是針對定義該方法的不同版本的kafka編譯的。檢查你的依賴關係。

是依賴管理系統有幫助的原因之一。

更新: 從他們的文件他們提供這種建立在Maven的:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.8.1.1</version> 
     <exclusions> 
      <exclusion> 
       <groupId>org.apache.zookeeper</groupId> 
       <artifactId>zookeeper</artifactId> 
      </exclusion> 
      <exclusion> 
       <groupId>log4j</groupId> 
       <artifactId>log4j</artifactId> 
      </exclusion> 
     </exclusions> 
    </dependency> 

看來你的卡夫卡版本太舊。

相關問題