2016-12-07 82 views
0

我正在研究三節點Hadoop mapreduce問題,該問題打算採用日期和點值作爲標題的200,000行input.csv文件(25行樣本數據的要點:https://gist.githubusercontent.com/PatMulvihill/63effd90411efe858330b54a4111fadb/raw/4033695ba5ca2f439cfd1512358425643807d83b/input.csv)。該程序應找到任何不是以下值的點值:200, 400, 600, 800, 1000, 1200, 1600, or 2000。該點值應該是價值。關鍵應該是從該點值之前的值開始的那一年。例如,如果我們有數據 2000-05-25,400 2001-10-12, 650 2001-04-09, 700 應發送給還原器的鍵值對是<2001, 650><2001, 700>。然後,縮減器應取每個給定年份中所有值的平均值,並將這些鍵值對寫入我指定的hdfs /out路徑。該程序編譯好,但從來沒有寫任何輸出。我想知道爲什麼和我能做些什麼來解決它。 完整的代碼如下:Hadoop map-reducer不寫任何輸出

import java.io.IOException; 
import java.util.Arrays; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class JeopardyMR { 

public static class SplitterMapper extends Mapper <Object, Text, Text, IntWritable> { 

    public void map (Object key, Text value, Context context) throws IOException, InterruptedException { 
     // Convert the CSVString (of type Text) to a string 
     String CSVString = value.toString(); 
     // Split the string at each comma, creating an ArrayList with the different attributes in each index. 
     // Sometimes the questions will be split into multiple elements because they contain commas, but for the 
     // way we will be parsing the CSV's, it doesn't matter. 
     List<String> items = Arrays.asList(CSVString.split("\\s*,\\s*")); 
     // Loop through all the elements in the CSV 
     // Start i at 3 to ensure that you do not parse a point value that has a year absent from the data set. 
     // We can end the loop at items.size() w/o truncating the last 3 items because if we have a point value, we know 
     // that the corresponding year is in the items before it, not after it. 
     // We will miss 1 or 2 data points because of this, but it shouldn't matter too much because of the magnitude of our data set 
     // and the fact that a value has a low probability of actually being a daily double wager. 
     for (int i = 3; i < items.size(); i++) { 
      // We want a String version of the item that is being evaluated so that we can see if it matches the regex 
      String item = items.get(i); 
      if (item.matches("^\\d{4}\\-(0?[1-9]|1[012])\\-(0?[1-9]|[12][0-9]|3[01])$")) { 
       // Make sure that we don't get an out of bounds error when trying to access the next item 
       if (i + 1 >= items.size()) { 
        break; 
       } else { 
        // the wagerStr should always be the item after a valid air date 
        String wagerStr = items.get(i + 1); 
        int wager = Integer.parseInt(wagerStr); 
        // if a wager isn't the following values, assume that is a daily double wager 
        if (wager != 200 && wager != 400 && wager != 600 && wager != 800 && wager != 1000 && wager != 1200 && wager != 1600 && wager != 2000) { 
         // if we know that a point value of a question is in fact a daily double wager, find the year that the daily double happened 
         // the year will always be the first 4 digits of a valid date formatted YYYY-MM-DD 
         char[] airDateChars = item.toCharArray(); 
         String year = "" + airDateChars[0] + airDateChars[1] + airDateChars[2] + airDateChars[3]; 

         // output the follow key-value pair: <year, wager> 
         context.write(new Text(year), new IntWritable(wager)); 
        } 
       } 

      } 
     } 
    } 
} 

public static class IntSumReducer extends Reducer <Text, IntWritable, Text, IntWritable> { 

    private IntWritable result = new IntWritable(); 
    public void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 
     int sum = 0, count = 0; 
     for (IntWritable val : values) { 
      sum += val.get(); 
      count++; 
     } 
     int avg = sum/count; 
     result.set(avg); 
     context.write(key, result); 
    } 
} 

public static void main (String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Job job = Job.getInstance(conf, "jeopardy daily double wagers by year"); 
    job.setJarByClass(JeopardyMR.class); 
    job.setMapperClass(SplitterMapper.class); 
    job.setCombinerClass(IntSumReducer.class); 
    job.setReducerClass(IntSumReducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 
    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 

成功編譯端子輸出可以在這裏找到:https://gist.github.com/PatMulvihill/40b3207fe8af8de0b91afde61305b187 我非常新的Hadoop的map-reduce,而我大概做了非常愚蠢的錯誤。我將此代碼從此處找到的代碼中取出:https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html 如果我缺少任何有用的信息,請讓我知道。任何幫助,將不勝感激!謝謝。

+2

嘗試簡化示例(不要做任何過濾)來檢查數據是否會被寫入。 – Serhiy

+0

你可以檢查幾件事情,看看問題出在哪裏:是不是你讀了什麼?檢查地圖輸入記錄計數器。如果它爲零,請檢查輸入路徑。是不是你從映射器(可能是)寫入任何東西?檢查地圖輸出記錄計數器。嘗試打印內容,如果檢查模式匹配,看它是否打印 – vefthym

+0

@vefthym我檢查了地圖輸入記錄計數器,它等於輸入記錄的數量,這是預期的。我通過格式化和重新添加輸入文件到hdfs來確認輸入路徑是正確的。我認爲我的問題是我沒有寫任何從mapper到reducer的東西。地圖輸出記錄計數器爲0,寫入的字節數也是如此。以下是我的程序編譯成功時的終端消息輸出:https://gist.github.com/PatMulvihill/40b3207fe8af8de0b91afde61305b187 –

回答

1

我檢查並認爲items.size()是兩個。因爲你知道地圖的輸入是每行的文件和地圖任務執行映射函數的行。一旦每行按分號分隔,項目大小變爲2,然後在項目大於3時執行下一行。 您可以檢查映射輸出寫入字節以查看是否有數據寫入。 編輯: 這種替換地圖代碼:

public void map (Object key, Text value, Context context) throws IOException, InterruptedException { 
     String CSVString = value.toString(); 
     String[] yearsValue = CSVString.split("\\s*,\\s*"); 
     if(yearsValue.length == 2){ 
      int wager = Integer.parseInt(yearsValue[1]); 
      if (wager != 200 && wager != 400 && wager != 600 && wager != 800 && wager != 1000 && wager != 1200 && wager != 1600 && wager != 2000) { 
       char[] airDateChars = yearsValue[0].toCharArray(); 
       String year = "" + airDateChars[0] + airDateChars[1] + airDateChars[2] + airDateChars[3]; 
       context.write(new Text(year), new IntWritable(wager)); 

      } 
     }else{ 
      System.out.println(CSVString); 
     } 
} 
+0

謝謝@vahid。我收回了我的「不答覆」的旗幟,並贊成。但是,您的答案仍然是錯誤的,因爲在這種情況下使用組合器不僅是正確的,而且也是一種非常好的做法。請考慮刪除這部分答案。 – vefthym

+0

@vahid任何想法爲什麼'items.size()'只會是兩個?我的理解是,當我將'values'參數強制轉換爲字符串時,我將創建一個包含所有輸入的巨大字符串的所有值。我誤解了什麼嗎?另外,你如何確定'items.size()'是兩個?我的打印件不適用於hadoop應用程序,並且它們不會在日誌文件中結束。 –

+0

@Pat Mulvihill我們有兩個概念圖:1-地圖任務2-地圖功能。Mapreduce基於配置區域輸入數據並將其提供給地圖任務,然後將任務分區輸入數據庫映射到新行,並將每行分配給地圖功能。你寫地圖功能不是地圖任務。 – vahid

0

我實際上是由我的.csv文件轉換爲.txt文件修復了這個問題。這不是一個真正的解決方案的問題,但它是什麼讓我的代碼工作,現在我可以繼續理解爲什麼這是一個問題。另外,這可能會幫助未來的人!