2015-01-12 19 views
0

我有一個Avro的模式是這樣的 -如何在解碼原始字節數組後更改特定的字段值?

{ 
    "type":"record", 
    "name":"new_user", 
    "namespace":"com.hello", 
    "fields":[ 
     { 
     "name":"user_id", 
     "type":[ 
      "long", 
      "null" 
     ] 
     }, 
     { 
     "name":"segment", 
     "type":[ 
      "string", 
      "null" 
     ] 
     } 
    ] 
} 

我用我上面的Avro架構這樣的序列化數據,並且給了我一個字節數組,並工作正常 -

public static void main(String[] args) throws IOException { 
    Schema schema = new Parser() 
      .parse("{ \"type\":\"record\", \"name\":\"new_user\", \"namespace\":\"com.hello\", \"fields\":[ { \"name\":\"user_id\", \"type\":[ \"long\", \"null\" ] }, { \"name\":\"segment\", \"type\":[ \"string\", \"null\" ] } ] }"); 

    byte[] originalAvrodata = getAvroBinaryData(schema); 

    // how to get newAvroData byte array in which user_id 
    // is change to some other random long number? 
} 

private static byte[] getAvroBinaryData(Schema schema) throws IOException { 
    GenericRecord record = new GenericData.Record(schema); 
    record.put("user_id", 123456L); 
    record.put("segment", "hello"); 

    GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
    ByteArrayOutputStream os = new ByteArrayOutputStream(); 

    Encoder e = EncoderFactory.get().binaryEncoder(os, null); 

    writer.write(record, e); 
    e.flush(); 
    byte[] byteData = os.toByteArray(); 
    return byteData; 
} 

問題陳述:

我需要將originalAvrodata字節數組解碼,然後將user_id字段值改變爲其它一些long編號然後構造一個newAvroData字節數組使用相同的模式應該有user_id字段值到一些隨機long數字。這有可能通過任何使用Avro的機會來實現嗎?

回答

1

當然,這裏是一些註釋代碼,它應該幫助你開始:

public static void main(String[] args) throws IOException, JSONException { 
     Schema schema = new Schema.Parser() 
      .parse("{ \"type\":\"record\", \"name\":\"new_user\", \"namespace\":\"com.hello\", \"fields\":[ { \"name\":\"user_id\", \"type\":[ \"long\", \"null\" ] }, { \"name\":\"segment\", \"type\":[ \"string\", \"null\" ] } ] }"); 

     // create example record 
     GenericRecord record = new GenericData.Record(schema); 
     record.put("user_id", 123456L); 
     record.put("segment", "hello"); 

     // serialize record 
     byte[] recordData = getAvroBinaryData(schema, record); 

     // de-serialize byte array to record 
     GenericRecord readRecord = readRecord(schema, recordData); 

     // increment user_id field 
     Long userId = (Long) readRecord.get("user_id"); 
     readRecord.put("user_id", userId + 1); 

     // prints 123457 for the user_id 
     System.out.println(readRecord); 

     // serialize updated recored 
     byte[] updatedRecordData = getAvroBinaryData(schema, readRecord); 

     // do something with updatedRecordData 
    } 

    private static GenericRecord readRecord(Schema schema, byte[] originalAvrodata) throws IOException { 
     Decoder decoder = DecoderFactory.get().binaryDecoder(originalAvrodata, null);  
     DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 
     GenericRecord readRecord = null; 

     try { 
      readRecord = reader.read(null, decoder);    
     } catch (EOFException eofe) { 
      eofe.printStackTrace(); 
     } 

     return readRecord; 
    } 

    // takes the record to be serialized as an additonal parameter 
    private static byte[] getAvroBinaryData(Schema schema, GenericRecord record) throws IOException { 
     GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); 
     ByteArrayOutputStream os = new ByteArrayOutputStream(); 
     Encoder e = EncoderFactory.get().binaryEncoder(os, null); 
     writer.write(record, e); 
     e.flush(); 
     byte[] byteData = os.toByteArray(); 
     return byteData; 
    } 
相關問題