2017-06-06 103 views
-1

我有一個現有的Avro文件與模式。我需要將文件發送給Producer。閱讀現有的Avro文件併發送到Kafka

以下是我寫的代碼。

public class ProducerDataSample { 

    public static void main(String[] args) { 

     String topic = "my-topic"; 

     Schema.Parser parser = new Schema.Parser(); 
     Schema schema = parser.parse(AvroSchemaDefinitionLoader.fromFile("encounter.avsc").get()); 

      File file = new File("/home/hello.avro"); 
     try{ 
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); 
     DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema); 
     DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); 
     dataFileWriter.create(schema, outputStream); 
     dataFileWriter.appendTo(file); 
     dataFileWriter.close(); 
     System.out.println("Here comes the data: " + outputStream); 



     // Start KAFKA publishing 

     Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 

     KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props); 
     ProducerRecord<String, byte[]> producerRecord = null; 
     producerRecord = new ProducerRecord<String, byte[]>("m-topic","1",outputStream.toByteArray()); 
     messageProducer.send(producerRecord); 
     messageProducer.close(); 
     }catch(Exception e){ 
      System.out.println("Error in sending to kafka"); 
      e.printStackTrace(); 
     } 





    } 
} 

當我執行此我得到的錯誤:

Error in sending to kafka org.apache.avro.AvroRuntimeException: already open at org.apache.avro.file.DataFileWriter.assertNotOpen(DataFileWriter.java:85) at org.apache.avro.file.DataFileWriter.appendTo(DataFileWriter.java:203) at org.apache.avro.file.DataFileWriter.appendTo(DataFileWriter.java:193) at ProducerDataSample.main(ProducerDataSample.java:51)

任何幫助。 謝謝。

回答

1

你將不得不從Avro的文件中讀取數據,並將其序列化到字節數組

類似下面片斷

 final Schema schema = new Schema.Parser().parse(new File("sample.avsc"));    
     File file ="sample.avro" 

     //read the avro file to GenericRecord 
     final GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>(schema); 
     final DataFileReader<GenericRecord> genericRecords = new DataFileReader<>(file, genericDatumReader); 

     //serialize GenericRecords 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 

     Encoder binaryEncoder = EncoderFactory.get().binaryEncoder(out, null); 

     while (genericRecords.hasNext()) { 
      writer.write(genericRecords.next(), binaryEncoder); 
     } 
     binaryEncoder.flush(); 
     out.close(); 
     //send out.toByteArray() to kakfa 
+0

感謝@liju約翰。有效。 :) – abhi5800