2017-01-30 76 views
1

一般:我是一個想要在Storm/Kafka/Flink/MS Azure SA/Spark上運行一些性能測試(WordCount)的學生。我想使用Kafka Broker作爲輸入源。Storm Kafka-Spout不能正常工作

我從風暴啓動項目中使用的字計數實例,並添加卡夫卡作爲壺嘴:

public class WordCountKafkaTopology { 
    public static class SplitSentence extends ShellBolt implements IRichBolt { 

     public SplitSentence() { 
      super("python", "splitsentence.py"); 
     } 

     @Override 
     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
      declarer.declare(new Fields("word")); 
     } 

     @Override 
     public Map<String, Object> getComponentConfiguration() { 
      return null; 
     } 
    } 

    public static class WordCount extends BaseBasicBolt { 
     Map<String, Integer> counts = new HashMap<String, Integer>(); 

     @Override 
     public void execute(Tuple tuple, BasicOutputCollector collector) { 
      String word = tuple.getString(0); 
      Integer count = counts.get(word); 
      if (count == null) 
       count = 0; 
      count++; 
      counts.put(word, count); 
      collector.emit(new Values(word, count)); 
     } 

     @Override 
     public void declareOutputFields(OutputFieldsDeclarer declarer) { 
      declarer.declare(new Fields("word", "count")); 
     } 
    } 

    public static void main(String[] args) { 

     String zkIp = "localhost"; 

     String topicName = "perfTest"; 

     List<String> nimbus_seeds = new ArrayList<String>(); 
     nimbus_seeds.add("localhost"); 

     String zookeeperHost = zkIp +":2181"; 

     ZkHosts zkHosts = new ZkHosts(zookeeperHost); 

     SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, "/" + topicName, topicName); 
     kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); 


     TopologyBuilder builder = new TopologyBuilder(); 

     builder.setSpout("kafkaPerfTestSpout", kafkaSpout, 8); 

     builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("kafkaPerfTestSpout"); 
     builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); 

     Config config = new Config(); 

     config.setMaxTaskParallelism(5); 
     config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2); 
     config.put(Config.NIMBUS_SEEDS, nimbus_seeds); 
     config.put(Config.NIMBUS_THRIFT_PORT, 6627); 
     config.put(Config.STORM_ZOOKEEPER_PORT, 2181); 
     config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkIp)); 

     try { 
      StormSubmitter.submitTopology("my-kafka-topology", config, builder.createTopology()); 
     } catch (Exception e) { 
      throw new IllegalStateException("Couldn't initialize the topology", e); 
     } 
    } 

} 

通過運行topolgy我得到serveral的錯誤消息。噴口說:

java.lang.ExceptionInInitializerError在kafka.metrics.KafkaMetricsGroup $ class.newTimer(KafkaMetricsGroup.scala:89)在kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:26)在卡夫卡。 consumer.FetchRequestAndResponseMetrics。(FetchRequestAndResponseStats.scala:35)at kafka.consumer.FetchRequestAndResponseStats。(FetchRequestAndResponseStats.scala:47)at kafka.consumer.FetchRequestAndResponseStatsRegistry $$ anonfun $ 2.apply(FetchRequestAndResponseStats.scala:60)at kafka.consumer.FetchRequestAndResponseStatsRegistry在kafka.consumer上的kafka.consumer.FetchRequestAndResponseStatsRegistry $ .getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:64)處的kafka.utils.Pool.getAndMaybePut(Pool.scala:59)處的$ anonfun $ 2.apply(FetchRequestAndResponseStats.scala:60) .SimpleConsumer。(SimpleConsumer.scala:44)在org.apache的org.apache.storm.kafka.DynamicPartitionConnections.register(DynamicPartitionConnections.java:60)處的kafka.javaapi.consumer.SimpleConsumer。(SimpleConsumer.scala:34)。 (org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98)at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) )在org.apache.android.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129)at org.apache.storm.daemon.executor $ fn__7990 $ fn__8005 $ fn__8036.invoke(executor.clj:648)。 storm.util $ async_loop $ fn__624.invoke(util.clj:484)at clojure.lang.AFn.run(AFn.java:22)at java.lang.Thread.run(Thread.java:745)導致:java .lang.IllegalStateException:在com.yammer.metrics.Metrics處的java.lang.Runtime.addShutdownHook(Runtime.java:211)處的java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)處正在關閉。 java:21)... 19更多

在分割螺栓:

了java.lang.RuntimeException:了java.lang.RuntimeException:了java.lang.RuntimeException:PID:3973,姓名:分裂退出碼:0,Errorstring,則:在組織.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)at org.apache.storm.disruptor $ consume_batch_when_available.invoke(disruptor .clj:73)at org.apache.storm.daemon.executor $ fn__8058 $ fn__8071 $ fn__8124.invoke(executor.clj:850)at org.apache.storm.util $ async_loop $ fn__624.invoke(util.clj:484 )at clojure.lang.AFn.run(AFn.java:22)at java.lang.Thread.run(Thread.java:745)Cause d:java.lang.RuntimeException:java.lang.RuntimeException:pid:3973,name:split exitCode:0,errorString:at org.apache.storm.task.ShellBolt.execute(ShellBolt.java:150)at org。 apache.storm.daemon.executor $ fn__8058 $ tuple_action_fn__8060.invoke(executor.clj:731)at org.apache.storm.daemon.executor $ mk_task_receiver $ fn__7979.invoke(executor.clj:464)at org.apache.storm。 disruptor $ clojure_handler $ reify__7492.onEvent(disruptor.clj:40)at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)... 6 more引起來自:java.lang.RuntimeException:pid:3973 ,name:split exitCode:0,errorString:at org.apache.storm.task.ShellBolt.die(ShellBolt.java:295)at org.apache.storm.task.ShellBolt.access $ 400(ShellBolt.java:70)at org.apache.storm.task.ShellBolt $ BoltWriterRunnable.run(ShellBolt.java:398)... 1 more更多信息:java.io.IOException:在java.io.FileOutputStream.writeBytes(Native Method)上java .io.FileOutputStream.write(FileOutputStream.java:326)在java.io. BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:297)at sun.nio.cs.StreamEncoder 。flush(StreamEncoder.java:141)at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)at java.io.BufferedWriter.flush(BufferedWriter.java:254)at org.apache.storm.multilang.JsonSerializer.writeString (JsonSerializer.java:99)在org.apache.storm.multilang.JsonSerializer.writeMessage(JsonSerializer.java:93)位於org.apache.storm.multilang.JsonSerializer.writeBoltMsg(JsonSerializer.java:78)at org.apache。 storm.utils.ShellProcess.writeBoltMsg(ShellProcess.java:127)在org.apache.storm.task.ShellBolt $ BoltWriterRunnable.run(ShellBolt.java:387)... 1更多

我用的是卡夫卡-console-producer生成一些消息。我希望有一個人可以幫助我。我是編程風暴中的一名新手......

回答

0

刪除「config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,2);」做了這份工作!