2016-08-13 48 views
0
public class MessageHandler { 
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class); 

private void run() { 
    Properties props = new Properties(); 
    props.put("zookeeper.connect", "localhost:2181"); 
    props.put("group.id", "testgroup"); 
    props.put("zookeeper.session.timeout.ms", "500"); 
    props.put("zookeeper.sync.time.ms", "250"); 
    props.put("auto.commit.interval.ms", "1000"); 
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); 

    String topic = "mytopic"; 
    Map<String, Integer> topicCount = new HashMap<String, Integer>(); 
    topicCount.put(topic, 2); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumerConnector.createMessageStreams(topicCount); 
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); 
    int thread = 0; 
    LOGGER.info("size: {}", streams.size()); 
    ExecutorService executorService = Executors.newFixedThreadPool(2); 
    for (final KafkaStream stream : streams) { 
     final int tid = thread++; 
     LOGGER.info("submit thread {}", tid); 
     executorService.execute(new Runnable() { 
      @Override 
      public void run() { 
       for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : (Iterable<MessageAndMetadata<byte[], byte[]>>) stream) { 
        byte[] key = messageAndMetadata.key(); 
        String message = new String(messageAndMetadata.message()); 
        LOGGER.info("key: {} message: {} thread: {}", key, message, tid); 
       } 
      } 
     }); 
    } 

    if (consumerConnector != null) 
     consumerConnector.shutdown(); 
} 

public static void main(String[] args) { 
    new MessageHandler().run(); 
} 

}卡夫卡多線程comsumer拋出ClosedChannelException

運行此消費後,我得到這個異常:

WARN 2016-08-13 22:46:56.969] [testgroup_debian-1471099616127-8c8586c4-leader-finder-thread] kafka.utils.Logging$class.warn(Logging.scala:89) [Fetching topic metadata with correlation id 0 for topics [Set(mytopic)] from broker [BrokerEndPoint(0,debian,9092)] failed] 
java.nio.channels.ClosedChannelException 
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) 
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80) 
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79) 
    at kafka.producer.SyncProducer.send(SyncProducer.scala:124) 
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59) 
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94) 
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66) 
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) 

爲什麼會出現這種異常? 經紀人和動物園管理員的配置應該沒問題,因爲我可以使用控制檯生產者/消費者發送/接收消息。

回答

0

消費者是單線程的,而不是線程安全的。如果你想從兩個線程中消耗,每個線程都需要它自己的ConsumerConnector實例。

+0

那麼,如果我只能使用一個線程,那麼獲取多個流(主題計數> 1)有什麼意義?迭代器會阻塞,我永遠不會訪問其他流。 –

0

我似乎找到了問題。致電

consumerConnector.shutdown(); 

在消耗任何消息之前立即關閉連接。