1

我正在嘗試製作一個正在偵聽特定主題並將消費的消息處理爲JSON的kafka消費者。我試着按照春季文檔here給出的方法,但我無法將消息作爲JSON獲取。Spring-Kafka反序列化

這是我的接收機設置代碼:

@Configuration 
@EnableKafka 
public class ReceiverConfig { 

@Value("${kafka.bootstrap.servers}") 
private String bootstrapServers; 

@Bean 
public Map consumerConfigs() { 
    Map props = new HashMap<>(); 
    // list of host:port pairs used for establishing the initial connections 
    // to the Kakfa cluster 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    // consumer groups allow a pool of processes to divide the work of 
    // consuming and processing records 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "Waitlist"); 

    return props; 
} 

@Bean 
public ConsumerFactory consumerFactory() { 
    return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
} 

@Bean 
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 

    return factory; 
} 

@Bean 
public Receiver receiver() { 
    return new Receiver(); 
} 

@Bean 
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() { 
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = 
     new ConcurrentKafkaListenerContainerFactory<>(); 
    factory.setConsumerFactory(consumerFactory()); 
    factory.setMessageConverter(new StringJsonMessageConverter()); 
    return factory; 
} 
} 

消費者:

public class Receiver { 

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class); 

private CountDownLatch latch = new CountDownLatch(1); 

@KafkaListener(topics = "Reservation", 
     containerFactory = "kafkaJsonListenerContainerFactory") 
public void receiveMessage(Message<?> message) { 
    LOGGER.info("received message='{}'", message); 
    latch.countDown(); 
} 

public CountDownLatch getLatch() { 
    return latch; 
} 

} 

當我嘗試發佈遠程服務器上,我得到以下錯誤的一個話題:

 2017-02-09 13:42:49.122 [1;31mERROR[0;39m [36mo.s.k.listener.LoggingErrorHandler[0;39m Error while processing: ConsumerRecord(topic = Reservation, partition = 0, offset = 3394, CreateTime = 1486626082480, checksum = 1777660938, serialized key size = -1, serialized value size = 2, key = null, value = hi) 
     org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null') 
     at [Source: hi; line: 1, column: 5] 
      at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:81) 
      at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:82) 
      at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:157) 
      at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:68) 
      at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:764) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:708) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2500(KafkaMessageListenerContainer.java:230) 
      at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:975) 
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
      at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
      at java.lang.Thread.run(Thread.java:745) 
     Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hi': was expecting ('true', 'false' or 'null') 
     at [Source: hi; line: 1, column: 5] 
      at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) 
      at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2835) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1903) 
      at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:749) 
      at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834) 
      at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783) 
      at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2880) 
      at org.springframework.kafka.support.converter.StringJsonMessageConverter.extractAndConvertValue(StringJsonMessageConverter.java:78) 
      ... 11 common frames omitted 

但是,如果我從偵聽器中刪除containerfactory,我可以接收消息但它們不在J中SON的格式,但作爲字符串:

2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='{' 
2017-02-09 15:04:58.408 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_eventType":"Reservation",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_timestamp":"2017-01-23T09:19:35Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "_operation":"create",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "type":"excursion",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "reservationId":"46d353ac_9575_492a_9291_98d15bf4cc82",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "eventReservationLinkId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "master":true,' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "partySize":2,' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "startTime":"2017-01-27T08:30:00Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "endTime":"2017-01-27T10:00:00Z",' 
2017-02-09 15:04:58.417 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "timeslotId":"c2304a34_b9ba_4f3c_8e45_3e3c7677d6c2",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "variantSku":"ocean_polar_1606_FLL-640B",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "guestId":"378741",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "createdBy":"149673",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "purchaser":"143679",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "eventId":"ocean_polar_1606_FLL-640",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "scheduledEventId":"02c95434_3a99_452e_a2a8_51712683926c",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "resourceId":"",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "surpriseFlag":false,' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "venueId":"FLL001",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "status":"CONFIRMED",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "primaryId":"378741",' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message=' "partyId":"9b0bafb4_406e_43ae_94f2_36a913ce23d2"' 
2017-02-09 15:04:58.418 [34mINFO [0;39m [36mc.c.c.kafka.consumer.Receiver[0;39m received message='}' 

回答

0

你的形式交往是一個JSON文件

received message='{' 
received message=' "_eventType":"Reservation",' 
received message=' "_timestamp":"2017-01-23T09:19:35Z",' 
... 

的個別片段爲了從JSON轉換,它需要在一個單一的消息進行封裝。

+0

這意味着它是'生產者'端的責任。謝謝@加里! – Kuber