2016-10-01 108 views
1

當我試圖連接到一個主題與3與分區和3 FlinkKafkaConsumer09消耗從一個主題和使用Kafka消費者組屬性如下。KAFKA + FLINK 1.1.2消費者羣組不作爲例外

props.setProperty("group.id", "myGroup");     
props.setProperty("auto.offset.reset", "latest"); 

但仍然有3位消費者接收到所有數據。根據消費者羣體概念,數據只應發送給消費羣體內的一個消費者。

但它適用於普通的Java用戶。問題與FlinkKafkaConsumer09

回答

0

此問題可以通過在FlinkConsumer上書寫來解決。

步驟:1.你必須分區物業傳遞給消費者弗林克

問題:根據這一點,你有一個消費者對一個分區

public class YourConsumer<T> extends FlinkKafkaConsumerBase<T> 
{ 
    public static final long DEFAULT_POLL_TIMEOUT = 100L; 

    private final long pollTimeout; 

    public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { 
     this(Collections.singletonList(topic), valueDeserializer, props); 
    } 


    public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) { 
     this(Collections.singletonList(topic), deserializer, props); 
    } 


    public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) { 
     this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); 
    } 

    public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) { 
     super(topics, deserializer); 

     this.properties = checkNotNull(props, "props"); 
     setDeserializer(this.properties); 

     // configure the polling timeout 
     try { 
      if (properties.containsKey(KEY_POLL_TIMEOUT)) { 
       this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT)); 
      } else { 
       this.pollTimeout = DEFAULT_POLL_TIMEOUT; 
      } 
     } 
     catch (Exception e) { 
      throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e); 
     } 
    } 

    @Override 
    protected AbstractFetcher<T, ?> createFetcher(
      SourceContext<T> sourceContext, 
      List<KafkaTopicPartition> thisSubtaskPartitions, 
      SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, 
      SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, 
      StreamingRuntimeContext runtimeContext) throws Exception { 

     boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false")); 

     return new Kafka09Fetcher<>(sourceContext, thisSubtaskPartitions, 
       watermarksPeriodic, watermarksPunctuated, 
       runtimeContext, deserializer, 
       properties, pollTimeout, useMetrics); 
    } 

    @Override 
    protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) { 
     // read the partitions that belong to the listed topics 
     final List<KafkaTopicPartition> partitions = new ArrayList<>(); 
     int partition=Integer.valueOf(this.properties.get("partitions")); 
     try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) { 
      for (final String topic: topics) { 
       // get partitions for each topic 
       List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic); 
       // for non existing topics, the list might be null. 
       if (partitionsForTopic != null) { 
        partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic),partition); 
       } 
      } 
     } 

     if (partitions.isEmpty()) { 
      throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics); 
     } 

     // we now have a list of partitions which is the same for all parallel consumer instances. 
     LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics); 

     if (LOG.isInfoEnabled()) { 
      logPartitionInfo(LOG, partitions); 
     } 

     return partitions; 
    } 


    private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions,int partition) { 
     checkNotNull(partitions); 

     List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size()); 
     //for (PartitionInfo pi : partitions) { 
      ret.add(new KafkaTopicPartition(partitions.get(partition).topic(), partitions.get(partition).partition())); 
     // } 
     return ret; 
    } 


    private static void setDeserializer(Properties props) { 
     final String deSerName = ByteArrayDeserializer.class.getCanonicalName(); 

     Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); 
     Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); 

     if (keyDeSer != null && !keyDeSer.equals(deSerName)) { 
      LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); 
     } 
     if (valDeSer != null && !valDeSer.equals(deSerName)) { 
      LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); 
     } 

     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName); 
    } 
}