2017-05-19 23 views
1

我遇到了Kafka和Storm的問題。我不確定在這一點上,如果它是我設置的KafkaSpout配置的問題,或者如果我沒有正確地確認或者什麼。爲什麼Apache Storm KafkaSpout從Kafka主題中發出如此多的內容?

我在我的卡夫卡主題上排隊了50個項目,但是我的噴口發出了超過1300個(和計數)的元組。此外,Spout報道幾乎所有的都「失敗」。拓撲實際上沒有失敗,它成功地寫入到數據庫,但我不知道爲什麼它顯然是重播一切這麼多(如果這就是它在做什麼)

的一大問題是:

爲什麼當我只通過50到卡夫卡時,它發出了很多元組嗎?

enter image description here

這裏是我如何建立拓撲和KafkaSpout

public static void main(String[] args) { 
    try { 
     String databaseServerIP = ""; 
     String kafkaZookeepers = ""; 
     String kafkaTopicName = ""; 
     int numWorkers = 1; 
     int numAckers = 1; 
     int numSpouts = 1; 
     int numBolts = 1; 
     int messageTimeOut = 10; 
     String topologyName = ""; 

     if (args == null || args[0].isEmpty()) { 
     System.out.println("Args cannot be null or empty. Exiting"); 
     return; 
     } else { 
     if (args.length == 8) { 
      for (String arg : args) { 
      if (arg == null) { 
       System.out.println("Parameters cannot be null. Exiting"); 
       return; 
      } 
      } 
      databaseServerIP = args[0]; 
      kafkaZookeepers = args[1]; 
      kafkaTopicName = args[2]; 
      numWorkers = Integer.valueOf(args[3]); 
      numAckers = Integer.valueOf(args[4]); 
      numSpouts = Integer.valueOf(args[5]); 
      numBolts = Integer.valueOf(args[6]); 
      topologyName = args[7]; 
     } else { 
      System.out.println("Bad parameters: found " + args.length + ", required = 8"); 
      return; 
     } 
     } 

     Config conf = new Config(); 

     conf.setNumWorkers(numWorkers); 
     conf.setNumAckers(numAckers); 
     conf.setMessageTimeoutSecs(messageTimeOut); 

     conf.put("databaseServerIP", databaseServerIP); 
     conf.put("kafkaZookeepers", kafkaZookeepers); 
     conf.put("kafkaTopicName", kafkaTopicName); 

     /** 
     * Now would put kafkaSpout instance below instead of TemplateSpout() 
     */ 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout(topologyName + "-flatItems-from-kafka-spout", getKafkaSpout(kafkaZookeepers, kafkaTopicName), numSpouts); 
     builder.setBolt(topologyName + "-flatItem-Writer-Bolt", new ItemWriterBolt(), numBolts).shuffleGrouping(topologyName + "-flatItems-from-kafka-spout"); 


     StormTopology topology = builder.createTopology(); 

     StormSubmitter.submitTopology(topologyName, conf, topology); 

    } catch (Exception e) { 
     System.out.println("There was a problem starting the topology. Check parameters."); 
     e.printStackTrace(); 
    } 
    } 

    private static KafkaSpout getKafkaSpout(String zkHosts, String topic) throws Exception { 

    //String topic = "FLAT-ITEMS"; 
    String zkNode = "/" + topic + "-subscriber-pipeline"; 
    String zkSpoutId = topic + "subscriberpipeline"; 
    KafkaTopicInZkCreator.createTopic(topic, zkHosts); 


    SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zkHosts), topic, zkNode, zkSpoutId); 
    spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); 

    // spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; 
    //spoutConfig.startOffsetTime = System.currentTimeMillis(); 
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

    return new KafkaSpout(spoutConfig); 

    } 

,這裏是話題的情況下創造的事項

public static void createTopic(String topicName, String zookeeperHosts) throws Exception { 
    ZkClient zkClient = null; 
    ZkUtils zkUtils = null; 
    try { 

     int sessionTimeOutInMs = 15 * 1000; // 15 secs 
     int connectionTimeOutInMs = 10 * 1000; // 10 secs 

     zkClient = new ZkClient(zookeeperHosts, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$); 
     zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperHosts), false); 

     int noOfPartitions = 1; 
     int noOfReplication = 1; 
     Properties topicConfiguration = new Properties(); 

     boolean topicExists = AdminUtils.topicExists(zkUtils, topicName); 
     if (!topicExists) { 
     AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Disabled$.MODULE$); 
     } 
    } catch (Exception ex) { 
     ex.printStackTrace(); 
    } finally { 
     if (zkClient != null) { 
     zkClient.close(); 
     } 
    } 
    } 

回答

1

你需要看看螺栓中的信息是否失敗。

如果他們都失敗了,你可能沒有在螺栓上留言,或者在螺栓代碼中有異常。

如果發出螺栓消息,更可能是超時。增加拓撲超時配置或paralisim應該可以解決問題。

+0

謝謝。確定螺栓的正確方法是什麼?如何增加拓撲超時? – markg

+0

@markg如果您使用的是BaseBasicBolt,則無需處理該ack。它使用BaseRichBolt,你應該在execute方法中調用ack()。 – Solo

+0

@markg拓撲超時是「topology.message.timeout」配置,您可以在拓撲代碼或主管的storm.yaml中設置它 – Solo

相關問題