2017-08-09 119 views
1

我們的卡夫卡話題有一個問題,其中DefaultKafkaConsumerFactory & ConcurrentMessageListenerContainer組合描述here與工廠使用的JsonDeserializer消耗。不幸的是,有人有點熱心,並發表了一些無效的消息到主題上。看來spring-kafka默默無法處理這些消息中的第一個。是否有可能讓spring-kafka登錄錯誤並繼續?看看記錄的錯誤消息,似乎Apache kafka-clients庫應該處理這樣的情況:當迭代一批消息時,它們中的一個或多個消息可能無法解析?如何配置spring-kafka忽略錯誤格式的消息?

下面的代碼是一個示例測試案例說明這個問題:

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.common.serialization.Serializer; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import org.apache.kafka.common.serialization.StringSerializer; 
import org.junit.ClassRule; 
import org.junit.Test; 
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 
import org.springframework.kafka.core.DefaultKafkaProducerFactory; 
import org.springframework.kafka.core.KafkaTemplate; 
import org.springframework.kafka.listener.KafkaMessageListenerContainer; 
import org.springframework.kafka.listener.MessageListener; 
import org.springframework.kafka.listener.config.ContainerProperties; 
import org.springframework.kafka.support.SendResult; 
import org.springframework.kafka.support.serializer.JsonDeserializer; 
import org.springframework.kafka.support.serializer.JsonSerializer; 
import org.springframework.kafka.test.rule.KafkaEmbedded; 
import org.springframework.kafka.test.utils.ContainerTestUtils; 
import org.springframework.util.concurrent.ListenableFuture; 

import java.util.HashMap; 
import java.util.Map; 
import java.util.Objects; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 

import static org.junit.Assert.assertEquals; 
import static org.junit.Assert.assertThat; 
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey; 
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue; 

/** 
* @author jfreedman 
*/ 
public class TestSpringKafka { 
    private static final String TOPIC1 = "spring.kafka.1.t"; 

    @ClassRule 
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, TOPIC1); 

    @Test 
    public void submitMessageThenGarbageThenAnotherMessage() throws Exception { 
     final BlockingQueue<ConsumerRecord<String, JsonObject>> records = createListener(TOPIC1); 
     final KafkaTemplate<String, JsonObject> objectTemplate = createPublisher("json", new JsonSerializer<JsonObject>()); 

     sendAndVerifyMessage(records, objectTemplate, "foo", new JsonObject("foo"), 0L); 

     // push some garbage text to Kafka which cannot be marshalled, this should not interrupt processing 
     final KafkaTemplate<String, String> garbageTemplate = createPublisher("garbage", new StringSerializer()); 
     final SendResult<String, String> garbageResult = garbageTemplate.send(TOPIC1, "bar","bar").get(5, TimeUnit.SECONDS); 
     assertEquals(1L, garbageResult.getRecordMetadata().offset()); 

     sendAndVerifyMessage(records, objectTemplate, "baz", new JsonObject("baz"), 2L); 
    } 

    private <T> KafkaTemplate<String, T> createPublisher(final String label, final Serializer<T> serializer) { 
     final Map<String, Object> producerProps = new HashMap<>(); 
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); 
     producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "TestPublisher-" + label); 
     producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); 
     producerProps.put(ProducerConfig.RETRIES_CONFIG, 2); 
     producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); 
     producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); 
     producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000); 
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer.getClass()); 
     final DefaultKafkaProducerFactory<String, T> pf = new DefaultKafkaProducerFactory<>(producerProps); 
     pf.setValueSerializer(serializer); 
     return new KafkaTemplate<>(pf); 
    } 

    private BlockingQueue<ConsumerRecord<String, JsonObject>> createListener(final String topic) throws Exception { 
     final Map<String, Object> consumerProps = new HashMap<>(); 
     consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); 
     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "TestConsumer"); 
     consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
     consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000); 
     consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
     final DefaultKafkaConsumerFactory<String, JsonObject> cf = new DefaultKafkaConsumerFactory<>(consumerProps); 
     cf.setValueDeserializer(new JsonDeserializer<>(JsonObject.class)); 
     final KafkaMessageListenerContainer<String, JsonObject> container = new KafkaMessageListenerContainer<>(cf, new ContainerProperties(topic)); 
     final BlockingQueue<ConsumerRecord<String, JsonObject>> records = new LinkedBlockingQueue<>(); 
     container.setupMessageListener((MessageListener<String, JsonObject>) records::add); 
     container.setBeanName("TestListener"); 
     container.start(); 
     ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); 
     return records; 
    } 

    private void sendAndVerifyMessage(final BlockingQueue<ConsumerRecord<String, JsonObject>> records, 
             final KafkaTemplate<String, JsonObject> template, 
             final String key, final JsonObject value, 
             final long expectedOffset) throws InterruptedException, ExecutionException, TimeoutException { 
     final ListenableFuture<SendResult<String, JsonObject>> future = template.send(TOPIC1, key, value); 
     final ConsumerRecord<String, JsonObject> record = records.poll(5, TimeUnit.SECONDS); 
     assertThat(record, hasKey(key)); 
     assertThat(record, hasValue(value)); 
     assertEquals(expectedOffset, future.get(5, TimeUnit.SECONDS).getRecordMetadata().offset()); 
    } 

    public static final class JsonObject { 
     private String value; 

     public JsonObject() {} 

     JsonObject(final String value) { 
      this.value = value; 
     } 

     public String getValue() { 
      return value; 
     } 

     public void setValue(final String value) { 
      this.value = value; 
     } 

     @Override 
     public boolean equals(final Object o) { 
      if (this == o) { return true; } 
      if (o == null || getClass() != o.getClass()) { return false; } 
      final JsonObject that = (JsonObject) o; 
      return Objects.equals(value, that.value); 
     } 

     @Override 
     public int hashCode() { 
      return Objects.hash(value); 
     } 

     @Override 
     public String toString() { 
      return "JsonObject{" + 
        "value='" + value + '\'' + 
        '}'; 
     } 
    } 
} 
+0

爲此匯合平臺具有架構註冊地:https://github.com/confluentinc/schema-registry它由每個主題模式,當你產生\消費信息會根據特定驗證消息架構。你有意避免使用它嗎? – kvatashydze

+0

哦,根據這個鏈接:https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics這隻有在使用Apache Avro作爲序列化器\反序列化器時纔有可能。 – kvatashydze

+0

是的,我們不使用Avro或Protofbuf,只是普通的JSON –

回答

1

我有一個解決方案,但我不知道這是否是最好的之一,我擴展JsonDeserializer如下這會導致null值被spring-kafka消耗,並需要進行必要的下游更改以處理該情況。

class SafeJsonDeserializer[A >: Null](targetType: Class[A], objectMapper: ObjectMapper) extends JsonDeserializer[A](targetType, objectMapper) with Logging { 
    override def deserialize(topic: String, data: Array[Byte]): A = try { 
    super.deserialize(topic, data) 
    } catch { 
    case e: Exception => 
     logger.error("Failed to deserialize data [%s] from topic [%s]".format(new String(data), topic), e) 
     null 
    } 
} 
+0

您的解決方案是正確的;這並不是一個真正的Spring問題,因爲如果在解串器中失敗,Spring將無法看到該消息。我想我們可以改變反序列化器去做類似於你的東西,但是'null'可能不是正確的「對象」返回(因爲這在使用緊湊的主題時在Kafka中有意義)。隨意提交公關。 –