2013-06-20 66 views
2

到文件的值我有這樣, 一個記錄的輸入| 1 | Y, C | 0 | N, Ç| 1 | N, ð| 2 | Y, e | 1 | Y寫而無需移動到減速器

現在,在映射器中,我必須檢查第三列的值。如果是'Y',則該記錄必須直接寫入輸出文件,而不將該記錄移動到減速器或否則即'N'值記錄必須移動到減速器以供進一步處理。

因此, a | 1 | Y, ð| 2 | Y, E | 1 | Y 不應該去減速器但 C | 0 | N, ç| 1 | N 應到減速器,然後到輸出文件。

我該怎麼做?

+0

這不是那麼清楚。你只是不想運行減速機? –

+0

我不想將第三列值的記錄移動爲'Y',剩餘部分應該移至減速器。 –

+1

因此,您希望映射器將第3列'Y'的記錄直接寫入HDFS,並且第3列'N'的記錄只應寫入減速器? –

回答

2

你可能會做的是用MultipleOutputs - click here將'Y'和'N'類型的記錄分隔成兩個不同的映射器文件。

接下來,您爲兩個新生成的'Y'和'N'類型數據集運行分離作業。 對於'Y'類型,將減速器的數量設置爲0,以便減速器不被使用。而且,對於'N'類型,按照您想要使用減速器的方式進行。

希望這會有所幫助。

-1

在您的地圖功能中,您將逐行獲取輸入。按照|使用|分割作爲分隔符。 (通過使用String.split()方法是精確的) 它看起來像這樣

String[] line = value.toString().split('|'); 

訪問該陣列的第三個元素由line[2]

然後,使用一個簡單的if else陳述,發射具有N值的輸出進一步處理。

+1

if-else如何提供幫助?如果執行context.write()並且在工作中有一個reducer,它將從if或else中調用。我很抱歉,我不太明白。 – Tariq

1

看看這個工程,

public class Xxxx { 

    public static class MyMapper extends 
      Mapper<LongWritable, Text, LongWritable, Text> {   

     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {    

      FileSystem fs = FileSystem.get(context.getConfiguration()); 
      Random r = new Random();     
      FileSplit split = (FileSplit)context.getInputSplit(); 
      String fileName = split.getPath().getName();     
      FSDataOutputStream out = fs.create(new Path(fileName + "-m-" + r.nextInt()));        
      String parts[]; 
      String line = value.toString(); 
      String[] splits = line.split(","); 
      for(String s : splits) { 
       parts = s.split("\\|"); 
       if(parts[2].equals("Y")) {     
        out.writeBytes(line); 
       }else { 
        context.write(key, value); 
       } 
      } 
      out.close(); 
      fs.close(); 
     }  
    } 

    public static class MyReducer extends 
      Reducer<LongWritable, Text, LongWritable, Text> { 
     public void reduce(LongWritable key, Iterable<Text> values, 
       Context context) throws IOException, InterruptedException { 
      for(Text t : values) { 
      context.write(key, t); 
      } 
     } 
    } 

    /** 
    * @param args 
    * @throws IOException 
    * @throws InterruptedException 
    * @throws ClassNotFoundException 
    */ 
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
     // TODO Auto-generated method stub 

     Configuration conf = new Configuration(); 
     conf.set("fs.default.name", "hdfs://localhost:9000"); 
     conf.set("mapred.job.tracker", "localhost:9001"); 
     Job job = new Job(conf, "Xxxx"); 
     job.setJarByClass(Xxxx.class); 
     Path outPath = new Path("/output_path"); 
     job.setMapperClass(MyMapper.class); 
     job.setReducerClass(MyReducer.class); 
     FileInputFormat.addInputPath(job, new Path("/input.txt")); 
     FileOutputFormat.setOutputPath(job, outPath); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
}