2016-09-22 193 views
3

我有一個卡夫卡經紀人與多個主題,每個擁有一個單一的分區。卡夫卡消費者與JAVA

我有消費者認爲只是正常從主題

我的問題是我需要通過增加分區的數量,以提高通過消息隊列的放消耗的消息,說我有一個四個分區主題,有沒有辦法,我可以寫四個消費者,每個指向個人分區的主題?

import java.util.*; 
import kafka.consumer.Consumer; 
import kafka.consumer.ConsumerConfig; 
import kafka.consumer.ConsumerIterator; 
import kafka.consumer.KafkaStream; 
import kafka.javaapi.consumer.ConsumerConnector; 

public class KafkaConsumer { 
    private ConsumerConnector consumerConnector = null; 
    private final String topic = "mytopic"; 

    public void initialize() { 
     Properties props = new Properties(); 
     props.put("zookeeper.connect", "localhost:2181"); 
     props.put("group.id", "testgroup"); 
     props.put("zookeeper.session.timeout.ms", "400"); 
     props.put("zookeeper.sync.time.ms", "300"); 
     props.put("auto.commit.interval.ms", "1000"); 
     ConsumerConfig conConfig = new ConsumerConfig(props); 
     consumerConnector = Consumer.createJavaConsumerConnector(conConfig); 
    } 

    public void consume() { 
     //Key = topic name, Value = No. of threads for topic 
     Map<String, Integer> topicCount = new HashMap<String, Integer>();  
     topicCount.put(topic, new Integer(1)); 

     //ConsumerConnector creates the message stream for each topic 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = 
       consumerConnector.createMessageStreams(topicCount);   

     // Get Kafka stream for topic 'mytopic' 
     List<KafkaStream<byte[], byte[]>> kStreamList = 
               consumerStreams.get(topic); 
     // Iterate stream using ConsumerIterator 
     for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) { 
       ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator(); 

       while (consumerIte.hasNext()) 
         System.out.println("Message consumed from topic 
            [" + topic + "] : "  + 
             new String(consumerIte.next().message()));    
     } 
     //Shutdown the consumer connector 
     if (consumerConnector != null) consumerConnector.shutdown();   
    } 

    public static void main(String[] args) throws InterruptedException { 
     KafkaConsumer kafkaConsumer = new KafkaConsumer(); 
     // Configure Kafka consumer 
     kafkaConsumer.initialize(); 
     // Start consumption 
     kafkaConsumer.consume(); 
    } 

}

回答

1

從本質上講,所有你需要做的就是啓動多個消費者都是一樣的消費者小組。如果您使用的是kafka 0.9或更高版本的新消費者,或者您正在使用高級消費者,則kafka將負責劃分分區,以確保每個分區均由一位消費者閱讀。如果你的分區比消費者多,那麼一些消費者會收到來自多個分區的消息,但是不會有多個消費者從同一個消費者組讀取分區,以確保消息不會被複制。所以你永遠不會想要比分區更多的消費者,因爲一些消費者會閒置。您還可以使用簡單的消費者微調哪個消費者讀取每個分區https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

看來您正在使用來自Kafka 0.8或之前的舊消費者。你可能想考慮切換到新的消費者。 http://kafka.apache.org/documentation.html#intro_consumers

下面是使用新的消費寫作消費者的詳細示例的另一個好文章:http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/