2016-05-03 88 views
0

問題陳述:的MapReduce與Avro公司 - 通用解析

  1. 數據在HDFS提供Avro的格式。
  2. 上述avro數據的模式也可用。
  3. 這個Avro數據需要在map reduce中解析並生成具有相同模式的輸出avro數據(需要清理傳入的Avro數據)。
  4. 傳入的avro數據可以是任何模式。

因此,需求是編寫一個通用映射reduce,可以採用任何Avro數據,但生成與Avro格式相同的輸出。

代碼(多次嘗試後,這是我在多大程度上達到)

驅動

public class AvroDriver extends Configured implements Tool { 

    public int run(String[] args) throws Exception { 
     Job job = new Job(getConf()); 
     job.setJarByClass(AvroMapper.class); 
     job.setJobName("Avro With Xml Mapper"); 
     job.getConfiguration().setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true); 

     //This is required to use avro-1.7.6 and above 
     job.getConfiguration().set("mapreduce.job.user.classpath.first", "true"); 
     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.setInputFormatClass(AvroKeyInputFormat.class); 
     job.setMapperClass(AvroMapper.class); 
     Schema schema = new Schema.Parser().parse(new File(args[2])); 
     AvroJob.setInputKeySchema(job, schema); 
     job.setOutputFormatClass(AvroKeyOutputFormat.class); 
     job.setMapOutputKeyClass(AvroKey.class); 
     AvroJob.setOutputKeySchema(job, schema); 
     job.setNumReduceTasks(0); 
     return (job.waitForCompletion(true) ? 0 : 1); 
    } 

    public static void main(String[] args) throws Exception { 
     int res = ToolRunner.run(new AvroDriver(), args); 
     System.exit(res); 
    } 
} 

映射

public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> { 

     @Override 
     public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException { 

      try { 
       System.out.println("Specific Record - " + key); 
       System.out.println("Datum :: " + key.datum()); 
       System.out.println("Schema :: " + key.datum().getSchema()); 
       List<Field> fields = key.datum().getSchema().getFields(); 


       GenericRecord record = new GenericData.Record(key.datum().getSchema()); 
       for(Field f : fields) { 
        System.out.println("Field Name - " + f.name()); 
        record.put(f.name(), key.datum().get(f.name())); 
       } 
       System.out.println("Record - " + record); 
       GenericData d = new GenericData(); 
       d.newRecord(record, key.datum().getSchema()); 
       AvroKey<GenericData> outkey = new AvroKey<GenericData>(d); 

       System.out.println("Generic Record (Avro Key) - " + outkey); 
       context.write(outkey, NullWritable.get()); 
      } catch (Exception e) { 
       e.printStackTrace(); 
       throw new IOException(e.getMessage()); 
      } 
     } 
    } 

命令

Hadoop的罐子$ jar_name都$ input_avro_data_path $ output_path $ path_to_the_input_avro_schema

Avro的模式樣本

{ "type" : "record", "name" : "Entity", "namespace" : "com.sample.avro", "fields"....... 

問題,當我跑地圖,我得到降低

錯誤運行孩子:顯示java.lang.NullPointerException:在com.sample.avro.Entity

org.apache.avro.file.DataFileWriter $ AppendWriteException的 com.sample.avro.Entity空: 的java.lang。 NullPointerException異常:在 com.sample.avro.Entity

環境

HDP 2.3沙盒

任何com.sample.avro.Entity空想法?

修訂

我嘗試以下,但相同的結果

public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData>, NullWritable> { 

     @Override 
     public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException { 

      try { 
       System.out.println("Specific Record - " + key); 
       System.out.println("Datum :: " + key.datum()); 
       System.out.println("Schema :: " + key.datum().getSchema()); 
       List<Field> fields = key.datum().getSchema().getFields(); 

       Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false); 
       List<Field> outFields = new ArrayList<Field>(); 
       for(Field f : fields) { 
        System.out.println("Field Name - " + f.name()); 
        Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null); 
        outFields.add(f1); 
       } 
       s.setFields(outFields); 

       System.out.println("Out Schema - " + s); 
       GenericRecord record = new GenericData.Record(s); 
       for(Field f : fields) { 
        record.put(f.name(), key.datum().get(f.name())); 
       } 
       System.out.println("Record - " + record); 
       GenericData d = new GenericData(); 
       d.newRecord(record, s); 
       AvroKey<GenericData> outkey = new AvroKey<GenericData>(d); 
       System.out.println("Generic Record (Avro Key) - " + outkey.datum()); 
       context.write(outkey, NullWritable.get()); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 

     } 
    } 

請注意,Avro的輸入地圖降低工作正常,但在Avro的格式輸出是這裏的問題。

回答

0

最後,我找到了答案和映射器代碼如下。 而不是發出與GenericData的AvroKey,我改變發出GenericData.Record。

public static class AvroMapper extends Mapper<AvroKey<GenericData.Record>, NullWritable, AvroKey<GenericData.Record>, NullWritable> { 

     @Override 
     public void map(AvroKey<GenericData.Record> key, NullWritable value, Context context) throws IOException, InterruptedException { 

      try { 
       System.out.println("Specific Record - " + key); 
       System.out.println("Datum :: " + key.datum()); 
       System.out.println("Schema :: " + key.datum().getSchema()); 
       List<Field> fields = key.datum().getSchema().getFields(); 

       Schema s = Schema.createRecord(key.datum().getSchema().getName(), null, key.datum().getSchema().getNamespace(), false); 
       List<Field> outFields = new ArrayList<Field>(); 
       for(Field f : fields) { 
        System.out.println("Field Name - " + f.name()); 
        Schema.Field f1 = new Schema.Field(f.name(),Schema.create(Schema.Type.STRING), null,null); 
        outFields.add(f1); 
       } 
       s.setFields(outFields); 

       System.out.println("Out Schema - " + s); 
       GenericData.Record record = new GenericData.Record(s); 
       for(Field f : fields) { 
        record.put(f.name(), key.datum().get(f.name())); 
       } 
       System.out.println("Record - " + record); 
       AvroKey<GenericData.Record> outkey = new AvroKey<GenericData.Record>(record); 
       System.out.println("Generic Record (Avro Key) - " + outkey.datum()); 
       context.write(outkey, NullWritable.get()); 
      } catch (Exception e) { 
       e.printStackTrace(); 
       System.out.println(e); 
       System.out.println(e.getMessage()); 
       throw new IOException(e.getMessage()); 
      } 
     } 
    }