我正在嘗試在Hadoop中查找中位數。該作業與以下錯誤而失敗:執行Mapreduce時出現Java堆空間錯誤
16/03/02 02:46:13 INFO mapreduce.Job: Task Id : attempt_1456904182817_0001_r_000412_0, Status : FAILED
Error: Java heap space
我經歷了很多帖子解決類似問題的去了,但沒有奏效。也幫助了來自:
我嘗試以下可能的解決方案:
- 增加Java堆大小在上面的帖子建議。
- 通過改變以下特性容器的
增加的尺寸:在紗線-site.xml中
yarn.scheduler.minimum分配-MB 1024
增加減速器的編號,以像這樣的更大的值:
job.setNumReduceTasks(1000);
但是,沒有任何上述工作對我來說。因此,我張貼這個。 我知道中位數不適合Hadoop,但任何人都可以提供任何可能有所幫助的解決方案。
java version "1.8.0_60"
Hadoop version is 2.x
我有一個10節點羣集,每個節點上有8 GB RAM,每個節點上有80 GB硬盤。
這裏是整個代碼:
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.descriptive.rank.Median;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class median_all_keys {
//Mapper
public static class map1 extends Mapper<LongWritable,Text,Text,DoubleWritable>{public void map(LongWritable key, Text value, Context context)
throws IOException,InterruptedException{
String[] line= value.toString().split(",");
double col1=Double.parseDouble(line[6]);
double col2=Double.parseDouble(line[7]);
context.write(new Text("Key0"+"_"+line[0]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key1"+"_"+line[1]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key1"+"_"+line[1]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key2"+"_"+line[2]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key2"+"_"+line[2]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key1"+"_"+line[1] +","+"key2"+"_"+ line[2]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key1"+"_"+line[1] +","+"key2"+"_"+ line[2]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key0"+"_"+line[0] +","+"key2"+"_"+ line[2]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0] +","+"key2"+"_"+ line[2]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+","+"key2"+"_"+line[2]+"_"+"Index:6"),new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+","+"key2"+"_"+line[2]+"_"+"Index:7"),new DoubleWritable(col2));
}
}
//Reducer
public static class sum_reduce extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
// HashMap<String,List<Float>> median_map = new HashMap<String,List<Float>>();
@SuppressWarnings({ "unchecked", "rawtypes" })
public void reduce(Text key,Iterable<DoubleWritable> value, Context context)
throws IOException,InterruptedException{
List<Double> values = new ArrayList<>();
for (DoubleWritable val: value){
values.add(val.get());
}
double res = calculate(values);
context.write(key, new DoubleWritable(res));
}
public static double calculate(List<Double> values) {
DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics();
for (Double value : values) {
descriptiveStatistics.addValue(value);
}
return descriptiveStatistics.getPercentile(50);
}
}
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
Job job = new Job(conf,"Sum for all keys");
//Driver
job.setJarByClass(median_all_keys.class);
//Mapper
job.setMapperClass(map1.class);
//Reducer
job.setReducerClass(sum_reduce.class);
//job.setCombinerClass(TestCombiner.class);
//Output key class for Mapper
job.setMapOutputKeyClass(Text.class);
//Output value class for Mapper
job.setMapOutputValueClass(DoubleWritable.class);
//Output key class for Reducer
job.setOutputKeyClass(Text.class);
job.setNumReduceTasks(1000);
//Output value class from Reducer
job.setOutputValueClass(DoubleWritable.class);
//Input Format class
job.setInputFormatClass(TextInputFormat.class);
//Final Output Format class
job.setOutputFormatClass(TextOutputFormat.class);
//Path variable
Path path = new Path(args[1]);
//input/output path
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
path.getFileSystem(conf).delete(path);
//exiting the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我沒有發現任何問題與硬件的規格。如果你可以發佈你的代碼,有人可以提供幫助。確保你沒有使用任何Java集合對象在mappers/redurs中存儲一些數據,可能會導致Java堆。 – srikanth
@srikanth 我已經添加了代碼。 –