2017-10-04 108 views
0

我想用消耗卡夫卡一個JSON消息卡夫卡連接API流。 我試着在谷歌搜索,但我無法找到如何讀取數據流API JSON消息任何實質性信息。使用JSON值連接JSON API中的卡夫卡流:JAVA

因此,在有限的知識我已經嘗試了以下方法。

package com.kafka.api.serializers.json; 

import java.util.Properties; 

import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.ForeachAction; 
import org.apache.kafka.streams.kstream.KStream; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 

import com.fasterxml.jackson.core.JsonProcessingException; 
import com.fasterxml.jackson.databind.JsonNode; 
import com.fasterxml.jackson.databind.ObjectMapper; 

public class ConsumerUtilities { 

    private static ObjectMapper om = new ObjectMapper(); 

    public static Properties getProperties() { 

     Properties configs = new Properties(); 
     configs.put(StreamsConfig.APPLICATION_ID_CONFIG, 
       "Kafka test application"); 
     configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.ByteArraySerializer"); 
     configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
       "org.apache.kafka.connect.json.JsonDeserializer"); 
     return configs; 
    } 

    public static KStreamBuilder getStreamingConsumer() { 
     KStreamBuilder builder = new KStreamBuilder(); 
     return builder; 
    } 

    public static void printStreamData() { 
     KStreamBuilder builder = getStreamingConsumer(); 
     KStream<String, JsonNode> kStream = builder.stream("test"); 
     kStream.foreach(new ForeachAction<String, JsonNode>() { 
      @Override 
      public void apply(String key, JsonNode value) { 
       try { 
        System.out.println(key + " : " + om.treeToValue(value, Person.class)); 
       } catch (JsonProcessingException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } 

     }); 

     KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties()); 
     kafkaStreams.start(); 
    } 

} 

package com.kafka.api.serializers.json; 

import com.fasterxml.jackson.databind.JsonNode; 
import com.fasterxml.jackson.databind.ObjectMapper; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import java.util.Properties; 

public class ProducerUtilities { 

    private static ObjectMapper om = new ObjectMapper(); 


    public static org.apache.kafka.clients.producer.Producer<String, JsonNode> getProducer() { 
     Properties configProperties = new Properties(); 
     configProperties.put(ProducerConfig.CLIENT_ID_CONFIG, 
       "kafka json producer"); 
     configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
       "localhost:9092"); 
     configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.common.serialization.ByteArraySerializer"); 
     configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
       "org.apache.kafka.connect.json.JsonSerializer"); 

     org.apache.kafka.clients.producer.Producer<String, JsonNode> producer = new KafkaProducer<String, JsonNode>(
       configProperties); 
     return producer; 
    } 

    public ProducerRecord<String,JsonNode> createRecord(Person person){ 
     JsonNode jsonNode = om.valueToTree(person); 
     ProducerRecord<String,JsonNode> record = new ProducerRecord<String,JsonNode>("test",jsonNode); 
     return record; 
    } 

} 

當我執行我得到異常如下

[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for group Kafka test application failed on partition assignment 
org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770) 
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40) 
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138) 
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078) 
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255) 
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68) 
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde 
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764) 
    ... 19 more 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Shutting down 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Stream thread shutdown complete 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] WARN org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] All stream threads have died. The Kafka Streams instance will be in an error state and should be closed. 
[Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29] State transition from REBALANCING to ERROR. 
Exception in thread "Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [Kafka test application-d3a307c9-d998-421f-829c-85532efc8b29-StreamThread-1] Failed to rebalance. 
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:543) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class org.apache.kafka.connect.json.JsonDeserializer 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:770) 
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init>(AbstractProcessorContext.java:59) 
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init>(ProcessorContextImpl.java:40) 
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:138) 
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1078) 
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:255) 
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:245) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1147) 
    at org.apache.kafka.streams.processor.internals.StreamThread.access$800(StreamThread.java:68) 
    at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:184) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) 
    ... 3 more 
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.connect.json.JsonDeserializer is not an instance of org.apache.kafka.common.serialization.Serde 
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248) 
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:764) 
    ... 19 more 

我找一些指導,以解決這一問題的代碼。

創建自定義的串行器和解串按馬蒂亞斯建議

package com.kafka.api.utilities; 

import java.util.Properties; 

import org.apache.kafka.streams.KafkaStreams; 
import org.apache.kafka.streams.StreamsConfig; 
import org.apache.kafka.streams.kstream.ForeachAction; 
import org.apache.kafka.streams.kstream.KStream; 
import org.apache.kafka.streams.kstream.KStreamBuilder; 

import com.kafka.api.models.Person; 
import com.kafka.api.serdes.JsonDeserializer; 
import com.kafka.api.serdes.JsonSerializer; 
import org.apache.kafka.common.serialization.Serdes; 
import org.apache.kafka.common.serialization.Serde; 

public class ConsumerUtilities { 

    //private static ObjectMapper om = new ObjectMapper(); 

    public static Properties getProperties() { 

     Properties configs = new Properties(); 
     configs.put(StreamsConfig.APPLICATION_ID_CONFIG, 
       "Kafka test application"); 
     configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
//  configs.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
//    "org.apache.kafka.common.serialization.ByteArraySerializer"); 
//  configs.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
//    "org.apache.kafka.connect.json.JsonDeserializer"); 
     return configs; 
    } 

    public static KStreamBuilder getStreamingConsumer() { 
     KStreamBuilder builder = new KStreamBuilder(); 
     return builder; 
    } 

    public static void printStreamData() { 
     JsonSerializer<Person> personJsonSerializer = new JsonSerializer<>(); 
     JsonDeserializer<Person> personJsonDeserializer = new JsonDeserializer<>(Person.class); 
     Serde<Person> personSerde = Serdes.serdeFrom(personJsonSerializer, personJsonDeserializer); 

     KStreamBuilder builder = getStreamingConsumer(); 
     KStream<String, Person> kStream = builder.stream(Serdes.String(),personSerde , "test"); 
     kStream.foreach(new ForeachAction<String, Person>() { 
      @Override 
      public void apply(String key, Person value) { 
       System.out.println(key + " : " + value.toString()); 
      } 

     }); 

     KafkaStreams kafkaStreams = new KafkaStreams(builder, getProperties()); 
     kafkaStreams.start(); 
    } 

} 

package com.kafka.api.serdes; 

import java.util.Map; 

import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Deserializer; 

import com.fasterxml.jackson.databind.ObjectMapper; 

public class JsonDeserializer<T> implements Deserializer<T>{ 

    private ObjectMapper om = new ObjectMapper(); 
    private Class<T> type; 

    /* 
    * Default constructor needed by kafka 
    */ 
    public JsonDeserializer(Class<T> type) { 
     this.type = type; 
    } 
    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @SuppressWarnings("unchecked") 
    @Override 
    public void configure(Map<String, ?> map, boolean arg1) { 
     if(type == null){ 
      type = (Class<T>) map.get("type"); 
     } 

    } 

    @Override 
    public T deserialize(String undefined, byte[] bytes) { 
     if(bytes == null || bytes.length == 0){ 
      return null; 
     } 

     try{ 
      return om.readValue(bytes, type); 
     }catch(Exception e){ 
      throw new SerializationException(e); 
     } 
    } 

    protected Class<T> getType(){ 
     return type; 
    } 

} 

package com.kafka.api.serdes; 

import java.util.Map; 

import org.apache.kafka.common.errors.SerializationException; 
import org.apache.kafka.common.serialization.Serializer; 

import com.fasterxml.jackson.core.JsonProcessingException; 
import com.fasterxml.jackson.databind.ObjectMapper; 

public class JsonSerializer<T> implements Serializer<T> { 

    private ObjectMapper om = new ObjectMapper(); 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void configure(Map<String, ?> config, boolean isKey) { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public byte[] serialize(String topic, T data) { 
     // TODO Auto-generated method stub 
     try { 
      return om.writeValueAsBytes(data); 
     } catch (JsonProcessingException e) { 
      throw new SerializationException(); 
     } 
    } 

} 

例外:執行流處理應用後,我得到了下面的異常。我很困惑。

[Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b] State transition from RUNNING to ERROR. 
Exception in thread "Kafka test application-cee84455-78ca-4a2f-881a-89b3c3a00e4b-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=test, partition=0, offset=0 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null') 
at [Source: [[email protected]; line: 1, column: 11] 
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'hello': was expecting ('true', 'false' or 'null') 
at [Source: [[email protected]; line: 1, column: 11] 
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1702) 
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:558) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3524) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2686) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878) 
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:772) 
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3834) 
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3783) 
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929) 
    at com.kafka.api.serdes.JsonDeserializer.deserialize(JsonDeserializer.java:43) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55) 
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56) 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) 

回答

1

流API需要讀取和寫入數據,因此,它使用的Serde的抽象,是用於在同一時間一個串行器和解串的包裝。這是例外基本上說的。

引起:org.apache.kafka.common.KafkaException:org.apache.kafka.connect.json.JsonDeserializer不是org.apache.kafka.common.serialization.Serde

的一個實例

因此,您需要將JsonSerializerJsonDeserialzier換成JsonSerde,並在StreamsConfig中使用此JsonSerde

最簡單的方法是使用Serdes.serdeFrom(...)方法(注:Serdes - 複數)。作爲替代方案,還可以實現Serde接口(注Serde - 奇)直接。你可以在Serdes類中找到關於如何實現Serde接口的例子。

+0

根據你的建議,我已經建立串行器和解串和包裹。但我得到了上述例外。請幫忙。 – dataEnthusiast

+0

異常來自您的反序列化器。我猜你需要自己調試這部分 - 它不是一個流的問題...您可以從堆棧跟蹤看到,你的解串器被稱爲...好像一個架構問題? –