0
我是Map Reduce的新手,並試圖解決一些問題,以便更好地通過實施學習。Mapreduce自定義鍵不起作用
背景:
我從movielens.com數據集,其中有各種電影評級。我正在嘗試計算電影的最大評分,並按照評分計數以降序對最終輸出進行排序(輸出的默認排序是通過電影ID進行的)。我想是這樣的:
movieId:RATING_COUNT(排序在RATING_COUNT降序)
我搜索網頁和發現,我可以通過自定義按鍵實現這一目標。所以我試圖使用它,但沒有得到正確的結果。
在調試時,發現事情在mapper中工作正常,但問題在於reducer。在reducer中,輸入鍵始終是我文件中的最後一條記錄,即最後一條記錄由mapper處理,因此我得到錯誤的輸出。
我附上我的課以供參考:
主類:
public final class MovieLens_CustomSort {
public static class Map extends Mapper<LongWritable, Text, CompositeKey, IntWritable> {
private IntWritable one = new IntWritable(1);
private IntWritable movieId;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String row = value.toString();
String splitRow[] = row.split("::");
CompositeKey compositeKey = new CompositeKey(Integer.valueOf(splitRow[1]), 1);
context.write(compositeKey, one);
}
}
public static class Reduce extends Reducer<CompositeKey, IntWritable, Text, IntWritable> {
@Override
protected void reduce(CompositeKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
Text outputKey = new Text(key.toString());
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
sum += iterator.next().get();
}
context.write(outputKey, new IntWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "max movie review");
job.setSortComparatorClass(CompositeKeyComparator.class);
job.setMapOutputKeyClass(CompositeKey.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
} }
自定義鍵:
public final class CompositeKey implements WritableComparable<CompositeKey> {
private int m_movieId;
private int m_count;
public CompositeKey() {
}
public CompositeKey(int movieId, int count) {
m_count = count;
m_movieId = movieId;
}
@Override
public int compareTo(CompositeKey o) {
return Integer.compare(o.getCount(), this.getCount());
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(m_movieId);
out.writeInt(m_count);
}
@Override
public void readFields(DataInput in) throws IOException {
m_movieId = in.readInt();
m_count = in.readInt();
}
public int getCount() {
return m_count;
}
public int getMovieId() {
return m_movieId;
}
@Override
public String toString() {
return "MovieId = " + m_movieId + " , count = " + m_count;
}}
自定義鍵比較:
public class CompositeKeyComparator extends WritableComparator {
protected CompositeKeyComparator() {
super(CompositeKey.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
CompositeKey c1 = (CompositeKey)w1;
CompositeKey c2 = (CompositeKey)w2;
return Integer.compare(c2.getCount(), c1.getCount());
}}
P.S:我知道我的關鍵類沒有什麼意義,但這是爲了學習目的而創建的。