2013-10-02 23 views
4

我的所有程序都使用hadoop的新MR1接口(org.apache.hadoop.mapreduce)編寫,所以我想使用avro的新org.apache.avro.mapreduce。但它不適合我。如何使用AVRO org.apache.avro.mapreduce接口進行編程?

該程序接受avro數據的輸入並輸出相同。 我的程序背後的主要思想是將hadoop的Mapper和Reducer分類爲avro包裝的key/value。 這裏是我的工作驅動程序的塊:

AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema()); 
    AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema()); 

    job.setMapperClass(MyAvroMap.class); 
    job.setReducerClass(MyAvroReduce.class); 

    job.setInputFormatClass(AvroKeyInputFormat.class); 
    job.setOutputFormatClass(AvroKeyOutputFormat.class); 

    job.setMapOutputKeyClass(AvroKey.class); 
    job.setMapOutputValueClass(AvroValue.class); 

    job.setOutputKeyClass(AvroKey.class); 
    job.setOutputValueClass(NullWritable.class); 

MyAvroMap和MyAvroReduce子respectivly的定義是

public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable, 
      AvroKey<CharSequence>, AvroValue<NetflowRecord>>{ ... } 

public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>, 
       AvroKey<NetflowRecord>, NullWritable>{ ... } 

的methioned NetflowRecord是我的Avro記錄類。而我得到了運行異常

java.lang.ClassCastException: class org.apache.avro.hadoop.io.AvroKey 

通過閱讀Hadoop的和Avro公司的源代碼, 我發現,這個例外是通過JobConf拋出,以確保 地圖關鍵是WritableComparable的子類,像這樣(hadoop1.2.1, line759)

WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class)); 

但Avro中顯示,AvroKey和AvroValue只是一個簡單的包裝沒有繼承的Hadoop的可寫*接口。

我相信,即使沒有測試,我也可以通過使用舊的mapred接口,但它不是我想要的。 你可以給我一些關於使用純org.apache.avro.mapreduce接口編程的例子或解釋嗎?

真誠,

賈敏

回答

4

經過艱苦搜索,這個補丁https://issues.apache.org/jira/browse/AVRO-593, 我弄清楚,每個AvroKey和AvroValue包裝必須在作業配置架構定義的幫助。 這就是我錯過的。

在這裏,我有兩個選擇:

  1. 如果剩餘的MyAvroMap和MyAvroReduce不變,我必須定義爲CharSequence的架構,並宣佈此架構與AvroJob的映射器輸出,如

    AvroJob .setMapOutputKeySchema(作業,<「defined-schema-for-charsequence」>); AvroJob.setMapOutputValueSchema(job,NetflowRecord.getClassSchema());

  2. 通過改變映射器輸出的key/value爲文本/ AvroValue,我只需要添加架構聲明映射輸出值,像

    job.setMapOutputKeyClass(Text.class); AvroJob.setMapOutputValueSchema(job,NetflowRecord.getClassSchema());

有了mapreduce API,我們不需要再繼承AvroMapper和AvroReducer的子類。 在這裏,我實現了option2,沒有在我的代碼中的附加架構定義。

Jamin

相關問題