1
我正在關注此網址https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example中提供的有關卡夫卡主題同時消費的示例。同時作爲卡夫卡消費者使用多個主題
在創建線程池的部分,它們具有以下代碼
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
我可以給topicCountMap添加更多主題。例如,
topicCountMap.put("channel1", new Integer(a_numThreads));
topicCountMap.put("channe2", new Integer(a_numThreads));
topicCountMap.put("channel3", new Integer(a_numThreads));
在上面的代碼,在我看來,這些流對象只能映射到的主題
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
我不是如何創建多個數據流對象完全肯定一個,每個映射到給定的主題,然後遍歷它們從每個渠道獲取數據,並讓它們提交給執行者。
爲什麼你需要每個主題的執行者? – nachokk