2017-04-25 77 views
0

我試圖實現不同模式:定製的可寫的Hadoop MapReduce的不同模式產生重複鍵

map(key, record): 
    emit record,null 
reduce(key, records): 
    emit key 

我的鑰匙是一個複雜的,定製Writable。如果我發出減少鍵及其散列碼:我收到以下輸出

context.write(key, new IntWtitable(key.hashCode()); 

key1 -1808937256 
key2 -768063202 
key3 906064410 
key2 -768063202 
key3 906064410 

從理論上講,輸出應該只包含key1key2key3因爲我使用HashPartitioner:具有相同散列碼的密鑰被組合到相同的分區中。這顯然不是這種情況。

如果我將我的複雜WritableText對象(並相應地調整映射/減速類),並在Mapper發出:

context.write(new Text(key.toString()), NullWritable.get()); 

...如預期的輸出:

key1 1013632023 
key2 762485389 
key3 -1193948769 

好吧,這裏是一個最小的工作示例,說明行爲。

輸入:

A A A A A 
B B B B B 
C C C C C 
A A A A A 
B B B B B 

的MapReduce工作:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 

import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.ArrayWritable; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 


public class DistinctPattern extends Configured implements Tool { 
public static class DistinctMapper extends Mapper<Object, Text, ComplexObject, NullWritable> { 


    public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
     ComplexObject o = new ComplexObject(value.toString()); 
     context.write(o, NullWritable.get()); 
    } 
} 

public static class DistinctReducer extends Reducer<ComplexObject, NullWritable, ComplexObject, IntWritable> { 


    public void reduce(ComplexObject key, Iterable<NullWritable> values, Context context) 
      throws IOException, InterruptedException { 

     context.write(key, new IntWritable(key.hashCode())); 
    } 
} 

public static class MyArrayWritable extends ArrayWritable { 

    public MyArrayWritable(Writable[] values) { 
     super(DatumObject.class, values); 
    } 

    public MyArrayWritable() { 
     super(DatumObject.class); 
    } 

    @Override 
    public String toString() { 
     return Arrays.toString(get()); 
    } 

} 

public static class DatumObject implements Writable { 
    private String datum; 

    public DatumObject() {} 

    public DatumObject(String d) { 
     datum = d; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     datum = in.readUTF(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     out.writeUTF(datum);  
    } 

    @Override 
    public String toString() { 
     return datum; 
    } 

    @Override 
    public int hashCode() { 
     return 31 * datum.hashCode(); 
    } 

} 

public static class ComplexObject implements WritableComparable<ComplexObject> { 
    private List<DatumObject> data = new ArrayList<>(); 

    public ComplexObject() {} 

    public ComplexObject(String d) { 
     String[] elements = d.split(" "); 
     for(int i = 0; i < elements.length; i++) 
      data.add(new DatumObject(elements[i])); 
    } 

    public int size() { 
     return data.size(); 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     data.clear(); 
     MyArrayWritable m = new MyArrayWritable(); 
     m.readFields(in); 
     Writable[] w = m.get(); 
     for(int i = 0; i < w.length; i++) 
      data.add((DatumObject) w[i]); 

    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     MyArrayWritable m = new MyArrayWritable(data.toArray(new DatumObject[data.size()])); 
     m.write(out); 
    } 

    @Override 
    public int compareTo(ComplexObject o) { 
     if(this.equals(o)) 
      return 0; 

     if(o.size() < this.size()) 
      return -1; 

     return 1; 
    } 

    @Override 
    public boolean equals(Object obj) { 
     if(!(obj instanceof ComplexObject)) 
      return false; 

     ComplexObject other = (ComplexObject) obj; 
     return other.data.equals(data); 
    } 

    @Override 
    public int hashCode() { 
     return 31 * data.hashCode(); 
    } 

    @Override 
    public String toString() { 
     StringBuilder s= new StringBuilder(); 
     data.forEach(entry -> { 
      s.append(entry); 
      s.append(" "); 
     }); 

     return s.toString(); 
    } 

} 

@Override 
public int run(String[] args) throws Exception { 
    Job job = Job.getInstance(); 
    job.setJar("distinct.jar"); 
    job.setJarByClass(DistinctPattern.class); 
    job.setMapperClass(DistinctMapper.class); 
    job.setReducerClass(DistinctReducer.class); 
    job.setMapOutputKeyClass(ComplexObject.class); 
    job.setMapOutputValueClass(NullWritable.class); 
    job.setOutputKeyClass(ComplexObject.class); 
    job.setOutputValueClass(IntWritable.class); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    return job.waitForCompletion(true) ? 0 : 1; 
} 

public static void main(String[] args) throws Exception {  
    int exitCode = ToolRunner.run(new DistinctPattern(), args); 
    System.exit(exitCode); 
} 
} 

預期輸出:

A A A A A  368623362 
B B B B B  1285710467 
C C C C C  -2092169724 

實際輸出:

A A A A A  368623362 
B B B B B  1285710467 
C C C C C  -2092169724 
A A A A A  368623362 
B B B B B  1285710467 

我錯過了什麼?

PS:Hadoop的2.7.3

回答

0

好了,發現在我的代碼錯誤(或多個)。首先,最小工作示例缺少equals方法的DatumObject類的實現:

@Override 
public boolean equals(Object obj) { 
    if(obj == null) 
     return false; 

    if(!(obj instanceof DatumObject)) 
     return false; 

    DatumObject other = (DatumObject) obj; 
     return other.datum.equals(datum); 
} 

其次,一方面我無法在最小工作示例重現,但它出現在我的實際代碼,是不是所有的我的key類確實實現了WritableComparable接口。因此,我懷疑洗牌階段沒有按預期對鍵進行分類。一旦compareTo方法在構成我的key值(see class diagram here)的所有類中正確實現,則不同的模式按預期工作。