2016-09-21 66 views
0
<dependency> 
     <groupId>org.apache.storm</groupId> 
     <artifactId>storm-core</artifactId> 
     <version>1.0.2</version> 
     <scope>provided</scope> 
</dependency> 
<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.0.0</version> 
     <scope>compile</scope> 
</dependency> 

我使用Storm Kafka Spout with New Kafka Consumer API in apache/storm/external/storm-kafka-client的storm-kafka-client。 我的拓撲結構是這樣的:如何解決風暴卡夫卡噴口只消費卡夫卡的一半數據?

public class AnalyseTopo { 
private static final Logger LOG = LoggerFactory.getLogger(AnalyseTopo.class); 


private static final String[] STREAMS = new String[]{"test_stream"}; 
private static final String[] TOPICS = new String[]{"online"}; 

public static void main(String[] args) throws Exception { 
    new AnalyseTopo().runMain(args); 
} 

protected void runMain(String[] args) throws Exception { 
    if (args.length == 0) { 
     submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig()); 
    } else { 
     submitTopologyRemoteCluster(args[0], getTopologyKafkaSpout(), getConfig()); 
    } 
} 

protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptException { 
    LocalCluster cluster = new LocalCluster(); 
    cluster.submitTopology("KafkaTest", config, topology); 
    stopWaitingForInput(); 
} 

protected void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception { 
    StormSubmitter.submitTopology(arg, config, topology); 
} 

protected void stopWaitingForInput() { 
    try { 
     System.out.println("PRESS ENTER TO STOP Now"); 
     new BufferedReader(new InputStreamReader(System.in)).readLine(); 
     System.exit(0); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

protected StormTopology getTopologyKafkaSpout() { 
    final TopologyBuilder builder = new TopologyBuilder(); 
    builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1); 

    // 1. 先用fastjson解析每條日誌記錄 
    builder.setBolt("json_parse", new JsonParseBolt()).shuffleGrouping("kafka_spout", STREAMS[0]); 

    // 2. 每隔60s去計算一下應用頻道的統計量,固定窗口爲時間增加60s 
    Duration oneMinite = new Duration(60, TimeUnit.SECONDS);// 60 -> 2 
    IWindowedBolt appChannelBolt = new AppChannelStatBolt() 
      .withTimestampField("timestamp") 
      .withLag(oneMinite) 
      .withWatermarkInterval(oneMinite) 
      .withTumblingWindow(oneMinite); 
    builder.setBolt("app_channel", appChannelBolt, 3) 
      .fieldsGrouping("json_parse", new Fields("timestamp")); //from app_channel change to timestamp 

    // 3. 將這些統計給到app整體統計,channel整體統計 
    IWindowedBolt appStatBolt = new AppStatBolt() 
      .withTimestampField("timestamp") 
      .withLag(oneMinite) 
      .withWatermarkInterval(oneMinite) 
      .withTumblingWindow(oneMinite); 
    builder.setBolt("app_stat", appStatBolt, 1) 
      .fieldsGrouping("app_channel", "stat", new Fields("appid")); 

    IWindowedBolt channelStatBolt = new ChannelStatBolt() 
      .withTimestampField("timestamp") 
      .withLag(oneMinite) 
      .withWatermarkInterval(oneMinite) 
      .withTumblingWindow(oneMinite); 
    builder.setBolt("channel_stat", channelStatBolt, 1) 
      .fieldsGrouping("app_channel", "stat", new Fields("channel")); 

    // 4. 寫道mysql持久化保存 
    IWindowedBolt batchWriteBolt = new BatchWriteBolt() 
      .withTumblingWindow(new BaseWindowedBolt.Count(10)); 
    builder.setBolt("batch_write", batchWriteBolt, 1) 
      .shuffleGrouping("app_channel", "sql") 
      .shuffleGrouping("app_stat", "sql") 
      .shuffleGrouping("channel_stat", "sql"); 

    return builder.createTopology(); 
} 


protected Config getConfig() { 
    Config config = new Config(); 
    config.setDebug(true); 
    config.put("topology.message.timeout.secs", 1000); 
    return config; 
} 

protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { 
    return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService()) 
      .setOffsetCommitPeriodMs(2000) 
      .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST) 
      .setMaxUncommittedOffsets(50000) 
      .setPollTimeoutMs(2000) 
      .build(); 
} 

protected KafkaSpoutRetryService getRetryService() { 
    return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), 
      TimeInterval.microSeconds(2), 35, TimeInterval.seconds(10));//change Integer.MAXVALUE to 3->50 
} 

protected Map<String, Object> getKafkaConsumerProps() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "cstr-01:9092,cstr-02:9092,cstr-03:9092"); 
    props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "storm2"); 
    props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); 
    props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); 
    //props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); 
    //props.put(KafkaSpoutConfig.Consumer.AUTO_COMMIT_INTERVAL_MS, "5000"); 

    // add resolve commit failure param 
    //props.put("session.timeout.ms", "50000"); //increase 
    //props.put("max.poll.records", "50000");  //reduce 
    return props; 
} 

protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() { 
    return new KafkaSpoutTuplesBuilderNamedTopics.Builder<String, String>(
      new OnlineTupleBuilder<>(TOPICS[0])) 
      .build(); 
} 

protected KafkaSpoutStreams getKafkaSpoutStreams() { 
    final Fields outputFields = new Fields("topic", "partition", "offset", "value"); 
    return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0]}) 
      .build(); 
} 

}

當我改變KafkaSpout.java打印consumerRecords的偏移我發現有些偏移跳過。 跳過偏移圖像http://7xtjbx.com1.z0.glb.clouddn.com/stack.png

我該怎麼辦這個問題?那個風暴 - 卡夫卡客戶端是否使用新消費者有一些問題? 謝謝!

回答

0

我在使用自動提交時解決了這個問題。

props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); 
props.put(KafkaSpoutConfig.Consumer.AUTO_COMMIT_INTERVAL_MS, "5000"); 

你可能找到一個NullPointException,並嘗試捕捉解決它是OK.And你需要在KafkaSpout.java 297左右取出numUncommittedOffsets++