1
我正在使用處理器來使用來自主題的字節數組serdes的字節數組數據,將它們處理爲通用記錄(基於模式I從我的HTTP GET請求獲得)並將它們發送到具有格式化avro模式註冊表的主題。空指針異常/未找到當我嘗試在Avro模式中處理和吸收數據時出現異常
我沒有問題從HTTP GET請求中檢索架構,並根據它來映射我的數據以生成架構之後的通用記錄。然而,當我試圖把它沉到的話題,我得到一個空指針異常:
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
atio.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.
java:72 )
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
atorg.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl
.java:83)
at streamProcessor.XXXXprocessor.process(XXXXprocessor.java:80)
at streamProcessor.XXXXprocessor.process(XXXXprocessor.java:1)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
atorg.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetr
icsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
atorg.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl
.java:111)
at streamProcessor.SelectorProcessor.process(SelectorProcessor.java:33)
at streamProcessor.SelectorProcessor.process(SelectorProcessor.java:1)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
atorg.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetr
icsImpl.java:188)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
atorg.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl
.java:83)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
這是我的拓撲代碼:
//Stream Properties
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "processor-kafka-streams234");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxxxxxxxxxxxx:xxxx");
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class);
//Build topology
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("messages-source", "mytest2");
builder.addProcessor("selector-processor",() -> new SelectorProcessor(), "messages-source");
builder.addProcessor("XXXX-processor",() -> new XXXXprocessor(), "selector-processor");
builder.addSink("XXXX-sink", "XXXXavrotest", new KafkaAvroSerializer(), new
KafkaAvroSerializer(), "XXXX-processor");
//Start Streaming
KafkaStreams streaming = new KafkaStreams(builder, config);
streaming.start();
System.out.println("processor streaming...");
的問題論壇上,我發現,我可能需要一些讀數之後注射時,我創建了KafkaAvroSerializer
s的客戶,所以我改變該行:
SchemaRegistryClient client = new
CachedSchemaRegistryClient("xxxxxxxxxxxxxxxxxxxxxx:xxxx/subjects/xxxxschemas/versions", 1000);
builder.addSink("XXXX-sink", "XXXXavrotest", new KafkaAvroSerializer(client), new
KafkaAvroSerializer(client), "XXXX-processor");
這就造成了HTTP 404未發現異常...
我猜你不需要客戶端,但要將模式註冊表url添加到你的StreamsConfig:'config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,「your-URL」);' –
Compare:https:// github。 COM/confluentinc /示例/斑點/ 3.2.x中/卡夫卡流/ SRC /測試/ JAVA/IO /匯合/示例/流/ GenericAvroIntegrationTest.java –