2016-03-09 42 views
6

我們正在升級我們的卡夫卡實現爲.9和使用新的消費者java api來創建consumer.I使用下面的代碼爲消費者,我們正在使用設置主題給消費者在LINE A LINE B是對我們的服務的調用,它處理我們收到的消息。現在的問題是,如果我們的消息處理需要30秒以上,我們會收到異常。kafka升級到0.9與新的消費者API

Properties props = new Properties(); 
      props.put("bootstrap.servers", "localhost:9092"); 
      props.put("group.id", "test-group"); 
      props.put("auto.offset.reset", "earliest"); 
      props.put("heartbeat.interval.ms", "1000"); 
      props.put("receive.buffer.bytes", 10485760); 
      props.put("fetch.message.max.bytes", 5242880); 
      props.put("enable.auto.commit", false); 
    //with partition assigned to consumer 


      KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props); 
      // TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0); 
      //consumer.assign(Arrays.asList(partition0)); 
      //assign topic to consumer without partition 
//LINE A 
      consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp()); 
      List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 
      while (true) { 
       try { 
        ConsumerRecords<Object, Object> records = consumer.poll(1000); 
        consumeFromQueue(records);//LINE B 
        consumer.commitSync(); 

       } catch (CommitFailedException e) { 
        e.printStackTrace(); 
        System.out.println("CommitFailedException"); 
       } catch (Exception e) { 
        e.printStackTrace(); 
        System.out.println("Exception in while consuming messages"); 
       } 

例外是

2016年3月3日10:47:35.095 INFO 6448 --- [問調度-3] o.a.k.c.c.internals.AbstractCoordinator:標記協調2147483647死。 2016-03-03 10:47:35.096 ERROR 6448 --- [ask-scheduler-3] oakccinternals.ConsumerCoordinator:錯誤ILLEGAL_GENERATION在爲組TEST-GROUP提交偏移時發生錯誤 CommitFailedException org.apache.kafka.clients。 consumer.CommitFailedException:提交無法完成,因爲org.apache.kafka.clients.consumer.internals.ConsumerCoordinator上的組重新平衡 $ OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552) at org.apache.kafka.clients.consumer。 incenals.ConsumerCoordinator $ OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665) at org.apache.kafka.clients。 consumer.internals.AbstractCoordinator $ CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644) at org.apache.kafka.clients.consumer.internals.RequestFuture $ 1.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals。 RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient $在org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient。 java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetwo rkClient.java:213)

上面的異常是在提交偏移時發生的。 任何建議將有助於謝謝

回答

6

發生這種情況是因爲新消費者是單線程的,並且它可以保持與消費者組的心跳的唯一方式是輪詢或提交偏移量,30秒後組協調器標誌着你的消費者已經死亡,並呼籲進行團體重新平衡。 對於這種情況,您可以增加request.timeout.ms或拆分2個線程之間的消耗和處理工作。

+0

感謝您的回覆返回給客戶端,我嘗試添加「request.timeout.ms」到70000,它給我的即使消息處理花了30000也是一樣。 –

+0

不知道爲什麼它沒有起作用。我甚至嘗試將「session.timeout.ms」設置爲70000,但它給了我例外說「org.apache.kafka.common.errors.ApiException:會話超時不在可接受的範圍內。」我在想你的第二個建議,但是如果在處理線程時出現一些問題,我會如何處理它?將它作爲新消息再次添加到主題並在異常之前還原由消息引起的更改? –

+0

增加group.max.session.timeout.ms以及 – Nautilus

0

您可以通過設置

max.partition.fetch.bytes 

到一些合適的閾值是比你的大郵件大,但如此之低,你會得到每個輪詢更少的消息限制()以投票表決返回的消息的數量。

卡夫卡0.10.x已經支持明確限制消息的數量通過設置

max.poll.records