2016-08-03 127 views
2

我一直在試圖找到春季啓動卡夫卡駱駝Avro消費者沒有運氣的示例代碼。我發現春天駱駝卡夫卡消費者和生產者的樣品在以下網址:春季開機卡夫卡駱駝Avro消費者

https://thysmichels.com/2015/09/04/apache-camel-kafka-spring-integration/

但什麼是缺少的是Avro的一部分。我在看駱駝文檔Avro的位置:

http://camel.apache.org/avro.html

我的具體問題是,一旦我的豆從Avro公司架構創建和我有POJO類,我怎麼告訴駱駝春天例如上述用戶Avro序列化?具體來說,我指這行代碼: 從( 「海邊的卡夫卡:本地主機:9092主題=測試& zookeeperHost =本地主機& zookeeperPort = 2181 &的groupId =組別1 & serializerClass = kafka.serializer.StringEncoder」)。豆(kafkaOutputBean.class );

其中串行器是StringEncoder。我如何告訴Camel使用Avro序列化?

+0

不會'serializerClass = ...'放在哪裏?例如'&serializerClass = avro.serializer.StringEncoder' – jny

回答

0

我找到了我自己的答案。因此,我想與你分享。它實際上是serializerClass=org.springframework.integration.kafka.serializer.avro.AvroSerializer。代碼非常簡單,你可以編寫自己的代碼。

public class AvroSerializer<T> { 

     public T deserialize(final byte[] bytes, final DatumReader<T> reader) throws IOException { 
      final Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); 
      return reader.read(null, decoder); 
     } 

     public byte[] serialize(final T input, final DatumWriter<T> writer) throws IOException { 
      final ByteArrayOutputStream stream = new ByteArrayOutputStream(); 

      final Encoder encoder = EncoderFactory.get().binaryEncoder(stream, null); 
      writer.write(input, encoder); 
      encoder.flush(); 

      return stream.toByteArray(); 
     } 
    }