以下是一個地圖縮小程序,其中在地圖功能中進行過濾並在縮小步驟中進行求和。MapReduce:在寫入上下文時無限期地減少停止
地圖部分執行得很好。但是當reduce部分運行時,它會卡在context.write(key,value)。
這種情況特別是只有當我嘗試寫不同的輸出減少功能型比什麼是寫在地圖的功能
public class Filter3 {
public static class TokenizerMapper extends Mapper<Object, Text, Text, Contestant>{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] cols = value.toString().split(",");
try {
Contestant val = new Contestant(cols[0],cols[1],cols[2]);
System.out.println();
System.out.println();
System.out.print(key+" ::: ");
System.out.println(val);
System.out.println();
System.out.println();
val.name = val.name.toUpperCase();
if(val.rating>=9) {
context.write(new Text(val.name), val); //write null if it is not required
}
} catch(Exception ex) {
ex.printStackTrace();
}
}
}
public static class AvgRatingReducer extends Reducer<Text,Contestant,Text,DoubleWritable> {
private DoubleWritable result = new DoubleWritable(0.0);
public void reduce(Text key, Iterable<Contestant> values, Context context) throws IOException, InterruptedException {
double sum = 0.0;
int count = 0;
for (Contestant val : values) {
sum += val.rating;
count++;
}
if(count>0) {
result.set(sum/(double)count);
}
System.out.println(result);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "AvgMRJob"); //configuration and job name
job.setJarByClass(Filter3.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(AvgRatingReducer.class);
job.setReducerClass(AvgRatingReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
Path inPath = new Path(args[0]);
Path outPath = new Path(args[1]);
outPath.getFileSystem(conf).delete(outPath,true);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
使用的可寫的對象是在這裏:
public class Contestant implements Writable {
long id;
String name;
double rating;
public Contestant() {}
public Contestant(long id, String name, double rating) {
this.id = id;
this.name = name;
this.rating = rating;
}
public Contestant(String id, String name, String rating) {
try {
this.id = Long.parseLong(id.trim());
} catch(Exception ex) {
}
this.name = name;
try {
this.rating = Double.parseDouble(rating.trim());
} catch(Exception ex) {
}
}
@Override
public void readFields(DataInput inp) throws IOException {
id = inp.readLong();
name = WritableUtils.readString(inp);
rating = inp.readDouble();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(id);
WritableUtils.writeString(out, name);
out.writeDouble(rating);
}
@Override
public String toString() {
return this.id + "," + this.name + "," + this.rating;
}
}
執行將輸出寫入上下文時陷入reduce函數。我沒有得到任何錯誤/異常。它只是無限期地掛起。 我不知道是什麼問題。我遵循了MapReduce的通常程序。
注意:如果我在地圖上都寫同一類型的數據和降低 同樣的程序工作。即如果我在Map和Reduce函數中寫入(key = Text,val = Contestant)。 - 而不是在減少使用DoubleWritable!
那真是太傻了......但是,爲什麼它沒有拋出任何錯誤/異常呢? - 我在想!!! –
這個問題不是因爲這個原因。但是,如果編輯完成,程序將工作。 真正的問題是Combiners輸入鍵值對和輸出鍵值對必須相同。在我的減速器中,它是不同的,因此不能用它作爲組合器。 真正的原因是組合前提條件不能正確滿足。 –
@vivek_nk這正是我的答案所說的。 –