2017-10-05 97 views
0

代碼發送Avro的消息到卡夫卡的話題錯誤登記的Avro架構:「串」

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
        io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
        io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
      props.put("schema.registry.url", "http://localhost:8081"); 
      producer = new KafkaProducer<>(props); 

    public void send(List<String> results){ 
      TestCallback callback = new TestCallback(); 
      for (Object result : results) { 
       ProducerRecord<String, String> record = new ProducerRecord(topic,result.toString());<==confused at this point 
       producer.send(record,callback); 

      } 
      producer.close(); 
     } 

發送方法包含從SQL查詢中提取的記錄列表。

錯誤

2017-10-05 23:54:36 DEBUG RestService:118 - Sending POST with input {"schema":"\"string\""} to http://localhost:8081/subjects/my_topicq1-value/versions 
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string" 

在一天結束的時候,我想取從主題,並通過使用卡夫卡連接-HDFS投入到HDFS這些記錄。

你能否給我提供一些意見,以便我可以繼續。 謝謝。

回答

0

如果您正在使用SQL記錄,您是否需要定製生產者 - 您可以直接使用Kafka Connect JDBC Connector從SQL查詢中提取數據,然後Kafka Connect將它歸入HDFS?