2017-07-14 62 views
0

我是Map Reduce的新手,並試圖解決一些問題,以便更好地通過實施學習。Mapreduce自定義鍵不起作用

背景

我從movielens.com數據集,其中有各種電影評級。我正在嘗試計算電影的最大評分,並按照評分計數以降序對最終輸出進行排序(輸出的默認排序是通過電影ID進行的)。我想是這樣的:

movieId:RATING_COUNT(排序在RATING_COUNT降序)

我搜索網頁和發現,我可以通過自定義按鍵實現這一目標。所以我試圖使用它,但沒有得到正確的結果。

在調試時,發現事情在mapper中工作正常,但問題在於reducer。在reducer中,輸入鍵始終是我文件中的最後一條記錄,即最後一條記錄由mapper處理,因此我得到錯誤的輸出。

我附上我的課以供參考:

主類:

public final class MovieLens_CustomSort { 

public static class Map extends Mapper<LongWritable, Text, CompositeKey, IntWritable> { 

    private IntWritable one = new IntWritable(1); 
    private IntWritable movieId; 

    @Override 
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
     String row = value.toString(); 
     String splitRow[] = row.split("::"); 
     CompositeKey compositeKey = new CompositeKey(Integer.valueOf(splitRow[1]), 1); 
     context.write(compositeKey, one); 
    } 
} 

public static class Reduce extends Reducer<CompositeKey, IntWritable, Text, IntWritable> { 

    @Override 
    protected void reduce(CompositeKey key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 
     int sum = 0; 
     Text outputKey = new Text(key.toString()); 

     Iterator<IntWritable> iterator = values.iterator(); 
     while (iterator.hasNext()) { 
      sum += iterator.next().get(); 
     } 
     context.write(outputKey, new IntWritable(sum)); 
    } 
} 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
    Configuration conf = new Configuration(); 
    Job job = Job.getInstance(conf, "max movie review"); 

    job.setSortComparatorClass(CompositeKeyComparator.class); 
    job.setMapOutputKeyClass(CompositeKey.class); 
    job.setMapOutputValueClass(IntWritable.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 
    job.setMapperClass(Map.class); 
    job.setReducerClass(Reduce.class); 

    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    job.waitForCompletion(true); 
} } 

自定義鍵:

public final class CompositeKey implements WritableComparable<CompositeKey> { 

private int m_movieId; 
private int m_count; 

public CompositeKey() { 

} 

public CompositeKey(int movieId, int count) { 
    m_count = count; 
    m_movieId = movieId; 
} 

@Override 
public int compareTo(CompositeKey o) { 
    return Integer.compare(o.getCount(), this.getCount()); 
} 

@Override 
public void write(DataOutput out) throws IOException { 
    out.writeInt(m_movieId); 
    out.writeInt(m_count); 
} 

@Override 
public void readFields(DataInput in) throws IOException { 
    m_movieId = in.readInt(); 
    m_count = in.readInt(); 
} 

public int getCount() { 
    return m_count; 
} 

public int getMovieId() { 
    return m_movieId; 
} 

@Override 
public String toString() { 
    return "MovieId = " + m_movieId + " , count = " + m_count; 
}} 

自定義鍵比較:

public class CompositeKeyComparator extends WritableComparator { 

protected CompositeKeyComparator() { 
    super(CompositeKey.class, true); 
} 

@Override 
public int compare(WritableComparable w1, WritableComparable w2) { 
    CompositeKey c1 = (CompositeKey)w1; 
    CompositeKey c2 = (CompositeKey)w2; 

    return Integer.compare(c2.getCount(), c1.getCount()); 
}} 

P.S:我知道我的關鍵類沒有什麼意義,但這是爲了學習目的而創建的。

回答

0

我已經解決了這個問題。問題在於CompositeKeyComparator,我在比較mapper之後的每個記錄的計數是1,因此每個記錄都是相等的。一旦我改變比較電影ID,它工作正常。