0
我想解析一個卡夫卡消息,它是在一些加密的AVRO格式。我有以下AvroSchema.avsc Avro的模式文件:解密卡夫卡Avro消息
{
"type": "record",
"namespace": "kafka.events",
"name": "AvroSchema",
"fields": [
{ "name": "product_id", "type": "string" },
{ "name": "available_to_promise_quantity", "type": "double" },
{ "name": "online_available_to_promise_quantity", "type": "double" },
{ "name": "stores_available_to_promise_quantity", "type": "double" },
{ "name": "is_infinite_inventory", "type": "boolean", "default" : false },
{ "name": "event_timestamp", "type": "long" },
{ "name": "previous_event", "type": "AvroSchema" }
]
}
現在,我寫了下面的代碼來獲得JSON格式的數據:
for (final KafkaStream<byte[], byte[]> stream : streams){
ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();
byte[] consumedEncryptedMessage;
MessageAndMetadata<byte[], byte[]> consumedEntry;
while(consumerIterator.hasNext()){
consumedEntry = consumerIterator.next();
if(null != consumedEntry){
consumedEncryptedMessage = consumedEntry.message();
try {
Schema schema = null;
schema = new Schema.Parser().parse(new File("src/AvroSchema.avsc"));
DatumReader<GenericRecord> reader = new SpecificDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(consumedEncryptedMessage , null);
GenericRecord decryptedmsg = null;
decryptedmsg = reader.read(null, decoder);
System.out.println(decryptedmsg);
}
catch(Exception e) {
e.printStackTrace();
System.out.println(e);
}
請幫助我如何對消息進行解密。
加密字節的消息是這樣的類型:080-21-0001 :�Aw�@@��A�ǐ�U :�Aw�@@��A
我所做的更改的建議,現在我有以下一段代碼:
while(consumerIterator.hasNext()){
consumedEntry = consumerIterator.next();
if(null != consumedEntry){
consumedEncryptedMessage = consumedEntry.message();
try {
Schema schema = new Schema.Parser().parse(new File("src/AVROSchema.avsc"));
File myfile = new File("/Users/z001ldc/Desktop/myfile.txt");
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
FileUtils.writeByteArrayToFile(myfile, consumedEncryptedMessage);
@SuppressWarnings("resource")
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(myfile, reader);
while (dataFileReader.hasNext()) {
decryptedMessage = dataFileReader.next(decryptedMessage);
System.out.println(decryptedMessage.get("product_id").toString());
}
}
catch(Exception e) {
e.printStackTrace();
System.out.println(e);
}
但我仍正在錯誤的「不是數據文件「。
DataFileReader dataFileReader =新DataFileReader (consumedEncryptedMessage,讀卡器); 這行顯示錯誤,因爲consumeEntryMessage不是文件,它的類型是byte []。我應該將該消息作爲文件使用嗎? –
哦......沒有看到你有事件鏈,就像在模式ATPEvent中指定的那樣。你可以請嘗試從你的模式中刪除它進行測試,並試着說出System.out.println(result.get(「product_id」)。toString())。希望這是罪魁禍首,其餘的代碼看起來很好。 – tesnik03
是否有任何方法只使用字節數組中的consumedentrymsg而不是File,因爲我只是將它作爲一個字節數組來使用。 –