2017-03-09 85 views
0

當使用camel-kafka時,是否有辦法a)調用默認的Kafka分區器,和/或b)確定分區算法中使用的主題的分區數量?在駱駝處理器目前,我有使用駱駝卡夫卡時是否可以訪問卡夫卡分區的數量?

exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, partition); 

但不知道多少個分區怎麼都可以,我不能選擇partition值。我可以用駱駝做這個,還是我需要「去土生土長」並調用KafkaProducer.partitionsFor

回答

1

我找到了答案。我可以通過省略設置KafkaConstants.PARTITION_KEY標題來調用默認分區程序。

我可以將它指定爲駱駝選擇使用分區程序的自定義實現:

from(...) 
    ... 
    .to("kafka:{{kafka.host}}:{{kafka.port}}" 
     + "?topic={{kafka.topic}}" 
     + "&partitioner=my.package.MyPartitioner"); 

和MyPartitioner我可以通過cluster參數分區信息:

@Override 
public int partition(String topic, 
    Object key, byte[] keyBytes, Object value, byte[] valueBytes, 
    Cluster cluster) { 

    List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic); 

    // use partitions.size()... 

    return ...  
}