我正在研究三節點Hadoop mapreduce問題,該問題打算採用日期和點值作爲標題的200,000行input.csv
文件(25行樣本數據的要點:https://gist.githubusercontent.com/PatMulvihill/63effd90411efe858330b54a4111fadb/raw/4033695ba5ca2f439cfd1512358425643807d83b/input.csv)。該程序應找到任何不是以下值的點值:200, 400, 600, 800, 1000, 1200, 1600, or 2000
。該點值應該是價值。關鍵應該是從該點值之前的值開始的那一年。例如,如果我們有數據 2000-05-25,400 2001-10-12, 650 2001-04-09, 700
應發送給還原器的鍵值對是<2001, 650>
和<2001, 700>
。然後,縮減器應取每個給定年份中所有值的平均值,並將這些鍵值對寫入我指定的hdfs /out
路徑。該程序編譯好,但從來沒有寫任何輸出。我想知道爲什麼和我能做些什麼來解決它。
完整的代碼如下:Hadoop map-reducer不寫任何輸出
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JeopardyMR {
public static class SplitterMapper extends Mapper <Object, Text, Text, IntWritable> {
public void map (Object key, Text value, Context context) throws IOException, InterruptedException {
// Convert the CSVString (of type Text) to a string
String CSVString = value.toString();
// Split the string at each comma, creating an ArrayList with the different attributes in each index.
// Sometimes the questions will be split into multiple elements because they contain commas, but for the
// way we will be parsing the CSV's, it doesn't matter.
List<String> items = Arrays.asList(CSVString.split("\\s*,\\s*"));
// Loop through all the elements in the CSV
// Start i at 3 to ensure that you do not parse a point value that has a year absent from the data set.
// We can end the loop at items.size() w/o truncating the last 3 items because if we have a point value, we know
// that the corresponding year is in the items before it, not after it.
// We will miss 1 or 2 data points because of this, but it shouldn't matter too much because of the magnitude of our data set
// and the fact that a value has a low probability of actually being a daily double wager.
for (int i = 3; i < items.size(); i++) {
// We want a String version of the item that is being evaluated so that we can see if it matches the regex
String item = items.get(i);
if (item.matches("^\\d{4}\\-(0?[1-9]|1[012])\\-(0?[1-9]|[12][0-9]|3[01])$")) {
// Make sure that we don't get an out of bounds error when trying to access the next item
if (i + 1 >= items.size()) {
break;
} else {
// the wagerStr should always be the item after a valid air date
String wagerStr = items.get(i + 1);
int wager = Integer.parseInt(wagerStr);
// if a wager isn't the following values, assume that is a daily double wager
if (wager != 200 && wager != 400 && wager != 600 && wager != 800 && wager != 1000 && wager != 1200 && wager != 1600 && wager != 2000) {
// if we know that a point value of a question is in fact a daily double wager, find the year that the daily double happened
// the year will always be the first 4 digits of a valid date formatted YYYY-MM-DD
char[] airDateChars = item.toCharArray();
String year = "" + airDateChars[0] + airDateChars[1] + airDateChars[2] + airDateChars[3];
// output the follow key-value pair: <year, wager>
context.write(new Text(year), new IntWritable(wager));
}
}
}
}
}
}
public static class IntSumReducer extends Reducer <Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0, count = 0;
for (IntWritable val : values) {
sum += val.get();
count++;
}
int avg = sum/count;
result.set(avg);
context.write(key, result);
}
}
public static void main (String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "jeopardy daily double wagers by year");
job.setJarByClass(JeopardyMR.class);
job.setMapperClass(SplitterMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
成功編譯端子輸出可以在這裏找到:https://gist.github.com/PatMulvihill/40b3207fe8af8de0b91afde61305b187
我非常新的Hadoop的map-reduce,而我大概做了非常愚蠢的錯誤。我將此代碼從此處找到的代碼中取出:https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html
如果我缺少任何有用的信息,請讓我知道。任何幫助,將不勝感激!謝謝。
嘗試簡化示例(不要做任何過濾)來檢查數據是否會被寫入。 – Serhiy
你可以檢查幾件事情,看看問題出在哪裏:是不是你讀了什麼?檢查地圖輸入記錄計數器。如果它爲零,請檢查輸入路徑。是不是你從映射器(可能是)寫入任何東西?檢查地圖輸出記錄計數器。嘗試打印內容,如果檢查模式匹配,看它是否打印 – vefthym
@vefthym我檢查了地圖輸入記錄計數器,它等於輸入記錄的數量,這是預期的。我通過格式化和重新添加輸入文件到hdfs來確認輸入路徑是正確的。我認爲我的問題是我沒有寫任何從mapper到reducer的東西。地圖輸出記錄計數器爲0,寫入的字節數也是如此。以下是我的程序編譯成功時的終端消息輸出:https://gist.github.com/PatMulvihill/40b3207fe8af8de0b91afde61305b187 –