2016-05-03 90 views
0

我想解析一個卡夫卡消息,它是在一些加密的AVRO格式。我有以下AvroSchema.avsc Avro的模式文件:解密卡夫卡Avro消息

{ 
    "type": "record", 
    "namespace": "kafka.events", 
    "name": "AvroSchema", 
     "fields": [ 
      { "name": "product_id", "type": "string" }, 
      { "name": "available_to_promise_quantity", "type": "double" }, 
      { "name": "online_available_to_promise_quantity", "type": "double" }, 
      { "name": "stores_available_to_promise_quantity", "type": "double" }, 
      { "name": "is_infinite_inventory", "type": "boolean", "default" : false }, 
      { "name": "event_timestamp", "type": "long" }, 
      { "name": "previous_event", "type": "AvroSchema" } 
     ] 
} 

現在,我寫了下面的代碼來獲得JSON格式的數據:

for (final KafkaStream<byte[], byte[]> stream : streams){ 
    ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator(); 
    byte[] consumedEncryptedMessage; 
    MessageAndMetadata<byte[], byte[]> consumedEntry; 
    while(consumerIterator.hasNext()){ 
     consumedEntry = consumerIterator.next(); 
      if(null != consumedEntry){ 
       consumedEncryptedMessage = consumedEntry.message(); 
        try { 
          Schema schema = null; 
          schema = new Schema.Parser().parse(new File("src/AvroSchema.avsc")); 
          DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema); 
          Decoder decoder = DecoderFactory.get().binaryDecoder(consumedEncryptedMessage , null); 
          GenericRecord decryptedmsg = null; 
          decryptedmsg = reader.read(null, decoder); 
          System.out.println(decryptedmsg); 
         } 
         catch(Exception e) { 
          e.printStackTrace(); 
          System.out.println(e); 
         } 

請幫助我如何對消息進行解密。

加密字節的消息是這樣的類型:080-21-0001 :�Aw�@@��A�ǐ�U :�Aw�@@��A

我所做的更改的建議,現在我有以下一段代碼:

while(consumerIterator.hasNext()){ 
    consumedEntry = consumerIterator.next(); 
     if(null != consumedEntry){ 
      consumedEncryptedMessage = consumedEntry.message(); 
       try { 
        Schema schema = new Schema.Parser().parse(new File("src/AVROSchema.avsc")); 
        File myfile = new File("/Users/z001ldc/Desktop/myfile.txt"); 
        DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 
        FileUtils.writeByteArrayToFile(myfile, consumedEncryptedMessage); 
        @SuppressWarnings("resource") 
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(myfile, reader); 
        while (dataFileReader.hasNext()) { 
         decryptedMessage = dataFileReader.next(decryptedMessage); 
         System.out.println(decryptedMessage.get("product_id").toString()); 
        } 
       } 
       catch(Exception e) { 
        e.printStackTrace(); 
        System.out.println(e); 
       } 

但我仍正在錯誤的「不是數據文件「。

回答

0

反序列化不會需要解密

首先,你讓你的架構行

schema = new Schema.Parser().parse(new File("src/AvroSchema.avsc")); 
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 

然後

DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(consumedEncryptedMessage, reader); 
GenericRecord user = null; 
while (dataFileReader.hasNext()) { 
// Reuse user object by passing it to next(). This saves us from 
// allocating and garbage collecting many objects for files with 
// many items. 
user = dataFileReader.next(user); 
System.out.println(user); 
+0

DataFileReader dataFileReader =新DataFileReader (consumedEncryptedMessage,讀卡器); 這行顯示錯誤,因爲consumeEntryMessage不是文件,它的類型是byte []。我應該將該消息作爲文件使用嗎? –

+0

哦......沒有看到你有事件鏈,就像在模式ATPEvent中指定的那樣。你可以請嘗試從你的模式中刪除它進行測試,並試着說出System.out.println(result.get(「product_id」)。toString())。希望這是罪魁禍首,其餘的代碼看起來很好。 – tesnik03

+0

是否有任何方法只使用字節數組中的consumedentrymsg而不是File,因爲我只是將它作爲一個字節數組來使用。 –