0
public class MessageHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class);
private void run() {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "testgroup");
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
String topic = "mytopic";
Map<String, Integer> topicCount = new HashMap<String, Integer>();
topicCount.put(topic, 2);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
int thread = 0;
LOGGER.info("size: {}", streams.size());
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (final KafkaStream stream : streams) {
final int tid = thread++;
LOGGER.info("submit thread {}", tid);
executorService.execute(new Runnable() {
@Override
public void run() {
for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : (Iterable<MessageAndMetadata<byte[], byte[]>>) stream) {
byte[] key = messageAndMetadata.key();
String message = new String(messageAndMetadata.message());
LOGGER.info("key: {} message: {} thread: {}", key, message, tid);
}
}
});
}
if (consumerConnector != null)
consumerConnector.shutdown();
}
public static void main(String[] args) {
new MessageHandler().run();
}
}卡夫卡多線程comsumer拋出ClosedChannelException
運行此消費後,我得到這個異常:
WARN 2016-08-13 22:46:56.969] [testgroup_debian-1471099616127-8c8586c4-leader-finder-thread] kafka.utils.Logging$class.warn(Logging.scala:89) [Fetching topic metadata with correlation id 0 for topics [Set(mytopic)] from broker [BrokerEndPoint(0,debian,9092)] failed]
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
爲什麼會出現這種異常? 經紀人和動物園管理員的配置應該沒問題,因爲我可以使用控制檯生產者/消費者發送/接收消息。
那麼,如果我只能使用一個線程,那麼獲取多個流(主題計數> 1)有什麼意義?迭代器會阻塞,我永遠不會訪問其他流。 –