2015-09-28 24 views
0

這是我的代碼使用variours精氨酸傳遞參數給記錄讀者的MapReduce的Hadoop

import java.io.File; 
import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.poi.hwpf.HWPFDocument; 
import org.apache.poi.hwpf.extractor.WordExtractor; 



public class Docsparser { 
     private static String Delimiter; 

    public static class DocsInputFormat extends FileInputFormat<Text, Text> { 

      @Override 
      public RecordReader<Text, Text> createRecordReader(InputSplit split, 
      TaskAttemptContext context) throws IOException, InterruptedException { 
       return new DocsLineRecordReader(); 
      } 
    } 

      public static class DocsLineRecordReader extends RecordReader<Text, Text> { 

       private Text key = new Text(); 
       private Text value = new Text(); 
       private int currentword = 0; 
       private String fileline; 
       private File file = null; 
       private String line; 
       private HWPFDocument document; 
       private WordExtractor extractor = null; 
       private String[] filedata; 
       StringBuilder sb = new StringBuilder(); 

       @Override 
       public void initialize(InputSplit split, TaskAttemptContext context) 
         throws IOException, InterruptedException { 

        FileSplit fileSplit = (FileSplit) split; 
        final Path file = fileSplit.getPath(); 
        Configuration conf = context.getConfiguration(); 
        FileSystem fs = file.getFileSystem(conf); 
        FSDataInputStream filein = fs.open(fileSplit.getPath()); 

        String Delim = conf.get("Delim"); 
         if (filein != null) 
         { 
          HWPFDocument document = new HWPFDocument(filein); 
          extractor = new WordExtractor(document); 
          fileline = extractor.getText(); 

          filedata = fileline.split(Delim); 
         } 
        } 


       @Override 
       public boolean nextKeyValue() throws IOException, InterruptedException 
       { 

        if (key == null) { 
         key = new Text(); 
        } 

        if (value == null) { 
         value = new Text(); 
        } 
        if(currentword < filedata.length) 
        { 
         for (currentword=0;currentword < filedata.length; currentword++) 
         {    
          sb.append(filedata[currentword] +","); 
          line = sb.toString();    

         } 

         key.set(line); 
         value.set(""); 
         return true; 
        } 
        else 
        { 
         key = null; 
         value = null; 
         return false; 
        } 

       } 

       @Override 
       public Text getCurrentKey() throws IOException, InterruptedException { 
        return key; 
       } 

       @Override 
       public Text getCurrentValue() throws IOException, InterruptedException { 
        return value; 
       } 

       @Override 
       public float getProgress() throws IOException, InterruptedException { 
        return (100.0f/filedata.length * currentword)/100.0f; 
       } 

      @Override 
       public void close() throws IOException { 

       } 
      } 


    public static class Map extends Mapper<Text, Text, Text, Text>{ 

     public void map(Text key, Text value, Context context) throws IOException, InterruptedException 
     { 

        context.write(key,value); 

     } 
    } 

     public static void main(String[] args) throws Exception 
     { 

       Configuration conf = new Configuration(); 
       Job job = new Job(conf, "Docsparser"); 
       job.setJarByClass(Docsparser.class); 


       job.setOutputKeyClass(Text.class); 
       job.setOutputValueClass(Text.class); 

       job.setMapperClass(Map.class); 
       job.setNumReduceTasks(0); 

       FileInputFormat.setInputPaths(job, new Path(args[0])); 
       FileOutputFormat.setOutputPath(job, new Path(args[1])); 

       Delimiter = args[2].toString(); 
       conf.set("Delim",Delimiter); 


       job.setInputFormatClass(DocsInputFormat.class); 
       job.setOutputFormatClass(TextOutputFormat.class); 

       System.exit(job.waitForCompletion(true) ? 0 : 1); 

     } 

} 

異常詳細信息:

15/09/28 03:50:04 INFO mapreduce.Job: Task Id : attempt_1443193152998_2319_m_000000_2, Status : FAILED Error: java.lang.NullPointerException at java.lang.String.split(String.java:2272) at java.lang.String.split(String.java:2355) at com.nielsen.grfe.Docsparser$DocsLineRecordReader.initialize(Docsparser.java:66) at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:548) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:786) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

+0

歡迎SO。這不是錯誤轉儲。請在此提出具體問題。 – Adriaan

回答

0

所有的配置變量必須在初始化Job類之前設置。移動

Delimiter = args[2].toString(); 
    conf.set("Delim",Delimiter); 

Job job = new Job(conf, "Docsparser"); 
+0

謝謝你Vignesh,它的工作..... :)感謝洛特... – Barath

+0

我很高興它的工作.. :) –

0

NullPointerException發生在filelinesplit方法串。我懷疑你沒有設置"Delim"配置值,因此,你的變量Delimnull

+0

謝謝你的回覆,但是我已經在配置中使用main方法設置它了Delimiter = args [2] .toString(); conf.set(「Delim」,Delimiter); – Barath

+0

您可以通過'initialize'方法中的'Delim!= null'來檢查這個值是否真的被運送到'DocsLineRecordReader'? –

+0

是的,Delim是空的,我得到空指針異常,我認爲delim沒有得到發貨,但我不知道,爲什麼會拋出這個錯誤,雖然我已經在主方法中設置了Delim值@配置... – Barath