2016-08-12 78 views
0

我需要訪問用Spark編寫的java程序中的avro文件數據。我可以使用MapReduce InputFormat類,但它給了我一個包含文件的每一行作爲關鍵的元組。因爲我不使用Scala,所以很難解析它。使用java在火花芯中讀取/寫入avro文件

JavaPairRDD<AvroKey<GenericRecord>, AvroValue> avroRDD = sc.newAPIHadoopFile("dataset/testfile.avro", AvroKeyInputFormat.class, AvroKey.class, NullWritable.class,new Configuration()); 

是否有任何工具類或jar可用,我可以用它來將avro數據直接映射到java類。例如。 codehaus.jackson軟件包提供了將json映射到java類的規定。

否則是否有任何其他方法可以輕鬆地將存在於avro文件中的字段解析爲java類或RDD。

回答

1

考慮你的avro文件包含序列化對,密鑰是String,值是一個avro類。然後,你可以有一些Utils類,看起來像這樣的一個通用的靜態函數:

public class Utils { 

    public static <T> JavaPairRDD<String, T> loadAvroFile(JavaSparkContext sc, String avroPath) { 
    JavaPairRDD<AvroKey, NullWritable> records = sc.newAPIHadoopFile(avroPath, AvroKeyInputFormat.class, AvroKey.class, NullWritable.class, sc.hadoopConfiguration()); 
    return records.keys() 
     .map(x -> (GenericRecord) x.datum()) 
     .mapToPair(pair -> new Tuple2<>((String) pair.get("key"), (T)pair.get("value"))); 
    } 
} 

然後你可以使用的方法是這樣的:

JavaPairRDD<String, YourAvroClassName> records = Utils.<YourAvroClassName>loadAvroFile(sc, inputDir); 

您可能還需要使用KryoSerializer和註冊您的自定義KryoRegistrator

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
sparkConf.set("spark.kryo.registrator", "com.test.avro.MyKryoRegistrator"); 

而且registrator類看起來是這樣的:

public class MyKryoRegistrator implements KryoRegistrator { 

    public static class SpecificInstanceCollectionSerializer<T extends Collection> extends CollectionSerializer { 
    Class<T> type; 
    public SpecificInstanceCollectionSerializer(Class<T> type) { 
     this.type = type; 
    } 

    @Override 
    protected Collection create(Kryo kryo, Input input, Class<Collection> type) { 
     return kryo.newInstance(this.type); 
    } 

    @Override 
    protected Collection createCopy(Kryo kryo, Collection original) { 
     return kryo.newInstance(this.type); 
    } 
    } 


    Logger logger = LoggerFactory.getLogger(this.getClass()); 

    @Override 
    public void registerClasses(Kryo kryo) { 
    // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type 
    // because Kryo is not able to serialize them properly, we use this serializer for them 
    kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializer<>(ArrayList.class)); 
    kryo.register(YourAvroClassName.class); 
    } 
}