2016-06-22 38 views
0

我有一個用例,其中從文件文件我一直在使用閱讀奇數行讀奇數記錄的Java地圖縮小:要實現或使用地圖降低

但作爲每Inputformat類,只讀取「\ n」作爲線路終止。補題想如下:

輸入:
桑帕特
庫馬爾
Hadoop的
mapredue

OUTPUT:
桑帕特
Hadoop的

+0

你有沒有試圖改變換行字符空間? - http://stackoverflow.com/questions/12118836/how-to-read-text-source-in-hadoop-separated-by-special-character - 和 - https://amalgjose.com/2013/05/27 /自定義文本輸入格式記錄定界符換的Hadoop /' –

回答

0

您可以根據實現所需的輸出你的用這種方式輸入也是:(不需要編寫自定義輸入/輸出格式)

輸入:

sampat1 kumar2 hadoop3 mapredue4 sampat1 kumar2 hadoop3 mapredue4 sampat1 kumar2 hadoop3 mapredue4 sampat1 kumar2 hadoop3 mapredue4 sampat1 kumar2 hadoop3 mapredue4 

輸出:

sampat1 hadoop3 sampat1 hadoop3 sampat1 hadoop3 sampat1 hadoop3 sampat1 hadoop3 

代碼:

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 


public class OddLine { 

    public static class OddLineMapper extends Mapper<Object, Text, Text, Text> { 

     private StringBuilder sb = new StringBuilder(""); 

     @Override 
     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 

      String[] lines = value.toString().split(" "); 

      for(int i=0; i < lines.length; i+=2) 
       sb.append(lines[i] + " "); 

      context.write(new Text(""), new Text(sb.toString())); 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     Configuration conf = new Configuration(); 

     Job job = Job.getInstance(conf, "Get odd words"); 
     job.setJarByClass(OddLine.class); 
     job.setMapperClass(OddLineMapper.class); 

     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 
     FileSystem fs = null; 
     Path dstFilePath = new Path(args[1]); 
     try { 
      fs = dstFilePath.getFileSystem(conf); 
      if (fs.exists(dstFilePath)) 
       fs.delete(dstFilePath, true); 
     } catch (IOException e1) { 
      e1.printStackTrace(); 
     } 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
}