2015-09-25 182 views
1

我正在關注此網址https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example中提供的有關卡夫卡主題同時消費的示例。同時作爲卡夫卡消費者使用多個主題

在創建線程池的部分,它們具有以下代碼

public void run(int a_numThreads) { 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    topicCountMap.put(topic, new Integer(a_numThreads)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 


    // now launch all the threads 
    // 
    executor = Executors.newFixedThreadPool(a_numThreads); 

    // now create an object to consume the messages 
    // 
    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new ConsumerTest(stream, threadNumber)); 
     threadNumber++; 
    } 
} 

我可以給topicCountMap添加更多主題。例如,

topicCountMap.put("channel1", new Integer(a_numThreads)); 
topicCountMap.put("channe2", new Integer(a_numThreads)); 
topicCountMap.put("channel3", new Integer(a_numThreads)); 

在上面的代碼,在我看來,這些流對象只能映射到的主題

List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

我不是如何創建多個數據流對象完全肯定一個,每個映射到給定的主題,然後遍歷它們從每個渠道獲取數據,並讓它們提交給執行者。

回答

4

假設你有:

String topic1 = "channel1"; 
String topic2 = "channel2"; 
String topic3 = "channel3"; 

然後,的確可以這樣做:

topicCountMap.put(topic1, new Integer(a_numThreads_topic1)); 
topicCountMap.put(topic2, new Integer(a_numThreads_topic2)); 
topicCountMap.put(topic3, new Integer(a_numThreads_topic3)); 

一旦你的consumerMap(即做即不改變的代碼),你就可以檢索每個主題的流:

List<KafkaStream<byte[], byte[]>> topic1_streams = consumerMap.get(topic1); 
List<KafkaStream<byte[], byte[]>> topic2_streams = consumerMap.get(topic2); 
List<KafkaStream<byte[], byte[]>> topic3_streams = consumerMap.get(topic3); 

要從流中消耗,您需要創建正確數量的執行人:

executors_topic1 = Executors.newFixedThreadPool(a_numThreads_topic1); 
executors_topic2 = Executors.newFixedThreadPool(a_numThreads_topic2); 
executors_topic3 = Executors.newFixedThreadPool(a_numThreads_topic3); 

最後:

int threadNumber = 0; 
for (final KafkaStream stream : topic1_streams) { 
    executors_topic1.submit(new ConsumerTest(streams, threadNumber)); 
    threadNumber++; 
} 
for (final KafkaStream stream : topic2_streams) { 
    executors_topic2.submit(new ConsumerTest(stream, threadNumber)); 
    threadNumber++; 
} 
for (final KafkaStream stream : topic3_streams) { 
    executor_topic3.submit(new ConsumerTest(stream, threadNumber)); 
    threadNumber++; 
} 

當然,這只是給你的想法。顯然,代碼可以改進。

+0

爲什麼你需要每個主題的執行者? – nachokk