我的所有程序都使用hadoop的新MR1接口(org.apache.hadoop.mapreduce)編寫,所以我想使用avro的新org.apache.avro.mapreduce。但它不適合我。如何使用AVRO org.apache.avro.mapreduce接口進行編程?
該程序接受avro數據的輸入並輸出相同。 我的程序背後的主要思想是將hadoop的Mapper和Reducer分類爲avro包裝的key/value。 這裏是我的工作驅動程序的塊:
AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());
job.setMapperClass(MyAvroMap.class);
job.setReducerClass(MyAvroReduce.class);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
MyAvroMap和MyAvroReduce子respectivly的定義是
public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
AvroKey<CharSequence>, AvroValue<NetflowRecord>>{ ... }
public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>,
AvroKey<NetflowRecord>, NullWritable>{ ... }
的methioned NetflowRecord是我的Avro記錄類。而我得到了運行異常
java.lang.ClassCastException: class org.apache.avro.hadoop.io.AvroKey
通過閱讀Hadoop的和Avro公司的源代碼, 我發現,這個例外是通過JobConf拋出,以確保 地圖關鍵是WritableComparable的子類,像這樣(hadoop1.2.1, line759)
WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
但Avro中顯示,AvroKey和AvroValue只是一個簡單的包裝沒有繼承的Hadoop的可寫*接口。
我相信,即使沒有測試,我也可以通過使用舊的mapred接口,但它不是我想要的。 你可以給我一些關於使用純org.apache.avro.mapreduce接口編程的例子或解釋嗎?
真誠,
賈敏