2017-06-20 62 views
0

我剛剛在學習hadoop最近(我用hadoop2.7.3 & java 1.7.0.89)。我寫了一些代碼來分析不同網站上的不同電視廣播&評論。我重寫了filesinputformat類。但是當我在eclipse上運行我的代碼時,有很多異常。我嘗試在eclipse上調試。只要找到發現映射器或減速有一些問題。但我不知道哪裏出了問題..MapReduce with rewrited fileInputFormat無法輸出結果

這裏是最新的例子,輔助數據是網站的暱稱

truelove 3 3678 0 0 0 1 
truelove 2 39155 0 0 173 438 
truelove 1 142208 1 2 1 1 
truelove 1 142208 1 2 1 1 
truelove 1 142208 1 2 1 1 
frink 2 950 0 0 0 0 
frink 2 800 0 0 0 0 
daughter 4 4489850 0 0 0 0 
daughter 4 1161 0 0 0 0 
princess 2 33593 0 0 0 3 
princess 2 36118 0 0 0 2 
princess 2 38608 0 0 0 1 
princess 3 2542 0 0 0 0 
princess 1 35322 2 4 0 1 

,然後我重寫inputformat

自定義數據格式

 package com.hadoop.mapreduce; 

    import java.io.DataInput; 
    import java.io.DataOutput; 
    import java.io.IOException; 

    import org.apache.hadoop.io.WritableComparable; 


    /*1-5means:1youku2souhu3tudou4aiqiyi5xunlei 
    princess 2 33593 0 0 0 3 
    princess 2 36118 0 0 0 2 
    princess 2 38608 0 0 0 1 
    princess 3 2542 0 0 0 0 
    princess 1 35322 2 4 0 1*/ 
    public class TVplaydata implements WritableComparable<Object>{ 
     //private String tvname; 

     private int tvplaynum; 
     private int tvfavorite; 
     private int tvcomment; 
     private int tvdown; 
     private int tvvote; 
    public TVplaydata(){} 
    public void set(int tvplaynum,int tvfavorite,int tvcomment,int tvdown,int tvvote){ 
     this.tvplaynum = tvplaynum; 
     this.tvfavorite = tvfavorite; 
     this.tvcomment = tvcomment; 
     this.tvdown = tvdown; 
     this.tvvote = tvvote; 
    } 
    //source get set 
    public void setTvpalynum(int tvplaynum) { 
     this.tvplaynum = tvplaynum; 
    } 
    public int getTvpalynum() { 
     return tvplaynum; 

    } 

    public int getTvfavorite() { 
     return tvfavorite; 
    } 
    public void setTvfavorite(int tvfavorite) { 
     this.tvfavorite = tvfavorite; 
    } 
    public int getTvcomment() { 
     return tvcomment; 
    } 
    public void setTvcomment(int tvcomment) { 
     this.tvcomment = tvcomment; 
    } 
    public int getTvdown() { 
     return tvdown; 
    } 
    public void setTvdown(int tvdown) { 
     this.tvdown = tvdown; 
    } 
    public int getTvvote() { 
     return tvvote; 
    } 
    public void setTvvote(int tvvote) { 
     this.tvvote = tvvote; 
    } 
     @Override 

     public void readFields(DataInput in) throws IOException { 
      // TODO Auto-generated method stub 
      tvplaynum = in.readInt(); 
      tvfavorite = in.readInt(); 
      tvcomment = in.readInt(); 
      tvdown = in.readInt(); 
      tvvote = in.readInt(); 
     } 

     @Override 

     public void write(DataOutput out) throws IOException { 
      // TODO Auto-generated method stub 
      out.writeInt(tvplaynum); 
      out.writeInt(tvfavorite); 
      out.writeInt(tvcomment); 
      out.writeInt(tvdown); 
      out.writeInt(tvvote); 
     } 

     @Override 
     public int compareTo(Object o) { 
      // TODO Auto-generated method stub 
      return 0; 
     } 

    } 

然後重寫inputformat。

package com.hadoop.mapreduce; 
import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.util.LineReader; 



class PlayinputFormat extends FileInputFormat<Text, TVplaydata>{ 

    @Override 
    public RecordReader<Text, TVplaydata> createRecordReader(InputSplit input, TaskAttemptContext context) 
      throws IOException, InterruptedException { 
     // TODO Auto-generated method stub 

     return new TvplayRecordReader(); 
    } 
    class TvplayRecordReader extends RecordReader<Text, TVplaydata>{ 

     public LineReader in; 
     public Text lineKey; 
     public TVplaydata lineValue; 
     public Text line; 

     @Override 
     public void close() throws IOException { 
      // TODO Auto-generated method stub 
      if(in !=null){ 
       in.close(); 
      } 
     } 

     @Override 
     public Text getCurrentKey() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      return lineKey; 
     } 

     @Override 
     public TVplaydata getCurrentValue() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      return lineValue; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      return 0; 
     } 

     @Override 
     public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      FileSplit split=(FileSplit)input; 
      Configuration job=context.getConfiguration(); 
      Path file=split.getPath(); 
      FileSystem fs=file.getFileSystem(job); 

      FSDataInputStream filein=fs.open(file); //open 
      in=new LineReader(filein,job); 
      line=new Text(); 
      lineKey=new Text(); 
      lineValue = new TVplaydata(); 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      // TODO Auto-generated method stub 
      int linesize=in.readLine(line); 
      if(linesize==0) return false; 

      String[] pieces = line.toString().split("\t"); 
      if(pieces.length != 7){ 
       throw new IOException("Invalid record received"); 
      } 


      lineKey.set(pieces[0]+"\t"+pieces[1]); 
      lineValue.set(Integer.parseInt(pieces[2]),Integer.parseInt(pieces[3]),Integer.parseInt(pieces[4]) 
        ,Integer.parseInt(pieces[5]),Integer.parseInt(pieces[6])); 
      return true; 
     } 
    } 
} 

最後寫運行映射&減速

package com.hadoop.mapreduce; 

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 



public class TVPlay extends Configured implements Tool{ 
public static void main(String[] args) throws Exception { 
    String[] paths = {"hdfs://wang:9000/mapreduce/tvplay.txt","hdfs://wang:9000/mapreduce/tvout"}; 
    int ec = ToolRunner.run(new Configuration(), new TVPlay(), paths); 
    System.exit(ec); 
} 


     //mapper 
public class TVmapper extends Mapper<Text, TVplaydata, Text, TVplaydata> { 
    public void map(Text key,TVplaydata value, Context context) throws IOException, InterruptedException { 
       context.write(key, value); 
      } 
     } 
public class TVreducer extends Reducer<Text, TVplaydata, Text, Text>{ 
      private Text m_key = new Text(); 
      private Text m_value = new Text(); 
      private MultipleOutputs<Text, Text> mos; 
      protected void setup(Context context) throws IOException, 
        InterruptedException { 
       mos = new MultipleOutputs<Text, Text>(context); 
      } 
     public void reduce(Text key,Iterable<TVplaydata> values, Context context) throws IOException, InterruptedException{ 

       int tvplaynum = 0; 
       int tvfavorite = 0; 
       int tvcomment = 0; 
       int tvvote = 0; 
       int tvdown = 0; 
      for (TVplaydata tv:values) { 
       tvplaynum += tv.getTvpalynum(); 
       tvfavorite += tv.getTvfavorite(); 
       tvcomment += tv.getTvcomment(); 
       tvvote += tv.getTvvote(); 
       tvdown += tv.getTvdown(); 
       } 

      String[] records = key.toString().split("\t"); 

      String source = records[1]; 
      m_key.set(records[0]); 
      m_value.set(tvplaynum+"\t"+tvfavorite+"\t"+tvcomment+"\t"+tvdown+"\t"+tvvote); 

      if(source.equals("1")){ 
       mos.write("youku", m_key, m_value); 
      }else if (source.equals("2")) { 
       mos.write("souhu", m_key, m_value); 
      }else if (source.equals("3")) { 
       mos.write("tudou",m_key, m_value); 
      }else if (source.equals("4")) { 
       mos.write("aiqiyi", m_key, m_value); 
      }else if (source.equals("5")) { 
       mos.write("xunlei", m_key, m_value); 
      }else{ 
       mos.write("other", m_key, m_value); 
      } 
     } 
     protected void cleanup(Context context) throws IOException ,InterruptedException{ 
       mos.close(); 
     } 
     } 

     @Override 
     public int run(String[] arg0) throws Exception { 
      // TODO Auto-generated method stub 
      Configuration conf = new Configuration(); 

      Path path = new Path(arg0[1]); 
      FileSystem hdfs = path.getFileSystem(conf); 
      if(hdfs.isDirectory(path)){ 
       hdfs.delete(path, true); 
      } 
      Job job = new Job(conf,"tvplay"); 
      job.setJarByClass(TVPlay.class); 
      // set InputFormatClass 
      job.setInputFormatClass(PlayinputFormat.class); 
      // set mapper 
      job.setMapperClass(TVmapper.class); 
      job.setMapOutputKeyClass(Text.class); 
      job.setMapOutputValueClass(TVplaydata.class); 
      // set reduce 
      job.setReducerClass(TVreducer.class); 
      job.setOutputKeyClass(Text.class); 
      job.setOutputValueClass(Text.class); 

      FileInputFormat.addInputPath(job, new Path(arg0[0])); 
      FileOutputFormat.setOutputPath(job,new Path(arg0[1])); 
      MultipleOutputs.addNamedOutput(job, "youku", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "souhu", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "tudou", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "aiqiyi", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "xunlei", TextOutputFormat.class, 
        Text.class, Text.class); 
      MultipleOutputs.addNamedOutput(job, "other", TextOutputFormat.class, 
        Text.class, Text.class); 

      return job.waitForCompletion(true)?0:1; 
     } 
} 

的主要方法下面是例外

2017-06-20 23:03:26,848 INFO [org.apache.hadoop.conf.Configuration.deprecation] - session.id is deprecated. Instead, use dfs.metrics.session-id 
2017-06-20 23:03:26,854 INFO [org.apache.hadoop.metrics.jvm.JvmMetrics] - Initializing JVM Metrics with processName=JobTracker, sessionId= 
2017-06-20 23:03:27,874 WARN [org.apache.hadoop.mapreduce.JobResourceUploader] - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 
2017-06-20 23:03:28,186 WARN [org.apache.hadoop.mapreduce.JobResourceUploader] - No job jar file set. User classes may not be found. See Job or Job#setJar(String). 
2017-06-20 23:03:28,236 INFO [org.apache.hadoop.mapreduce.lib.input.FileInputFormat] - Total input paths to process : 1 
2017-06-20 23:03:28,639 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - number of splits:1 
2017-06-20 23:03:29,389 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - Submitting tokens for job: job_local622257889_0001 
2017-06-20 23:03:30,552 INFO [org.apache.hadoop.mapreduce.Job] - The url to track the job: http://localhost:8080/ 
2017-06-20 23:03:30,556 INFO [org.apache.hadoop.mapreduce.Job] - Running job: job_local622257889_0001 
2017-06-20 23:03:30,607 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter set in config null 
2017-06-20 23:03:30,630 INFO [org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] - File Output Committer Algorithm version is 1 
2017-06-20 23:03:30,670 INFO [org.apache.hadoop.mapred.LocalJobRunner] - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
2017-06-20 23:03:31,562 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local622257889_0001 running in uber mode : false 
2017-06-20 23:03:31,567 INFO [org.apache.hadoop.mapreduce.Job] - map 0% reduce 0% 
2017-06-20 23:03:31,569 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Waiting for map tasks 
2017-06-20 23:03:31,571 INFO [org.apache.hadoop.mapred.LocalJobRunner] - Starting task: attempt_local622257889_0001_m_000000_0 
2017-06-20 23:03:31,667 INFO [org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter] - File Output Committer Algorithm version is 1 
2017-06-20 23:03:31,691 INFO [org.apache.hadoop.yarn.util.ProcfsBasedProcessTree] - ProcfsBasedProcessTree currently is supported only on Linux. 
2017-06-20 23:03:34,256 INFO [org.apache.hadoop.mapred.Task] - Using ResourceCalculatorProcessTree : [email protected] 
2017-06-20 23:03:34,259 INFO [org.apache.hadoop.mapred.LocalJobRunner] - map task executor complete. 
2017-06-20 23:03:34,485 WARN [org.apache.hadoop.mapred.LocalJobRunner] - job_local622257889_0001 
java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>() 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 
Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>() 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:134) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:745) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
    at java.util.concurrent.FutureTask.run(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.lang.NoSuchMethodException: com.hadoop.mapreduce.TVPlay$TVmapper.<init>() 
    at java.lang.Class.getConstructor0(Unknown Source) 
    at java.lang.Class.getDeclaredConstructor(Unknown Source) 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:128) 
    ... 8 more 
2017-06-20 23:03:34,574 INFO [org.apache.hadoop.mapreduce.Job] - Job job_local622257889_0001 failed with state FAILED due to: NA 
2017-06-20 23:03:34,598 INFO [org.apache.hadoop.mapreduce.Job] - Counters: 0 

我不知道這意味着什麼。想想你的幫助!

+0

您是否還可以顯示您用於運行MR作業的命令 –

回答

0

AT最後我發現映射器&減速器不是靜態方法。當我將它們更改爲靜態方法時,它會正常。

Why declaring Mapper and Reducer classes as static?

Hadoop的使用反射來創建一個類爲每個地圖的實例或降低運行任務。創建的新實例需要一個零參數構造函數(否則它將如何知道要傳遞什麼)。

通過聲明沒有static關鍵字的內部映射器或reduce類,java編譯實際上會創建一個構造函數,該構造函數需要在構造時傳入父類的實例。

您應該能夠通過對運行javap的命令來查看這個生成的類文件

而且,在父類中聲明使用時(這就是爲什麼你永遠看不到它在頂部的static關鍵字無效級別,但只限於兒童班級)

0

您已將作業配置置於reducer的run()方法中,這是錯誤的。最簡單的解決方案是將所有這些代碼移動到TVplaymain()方法中。

除非你清楚你在做什麼,否則你通常不應該重寫run()方法,所以我會從reduce類中完全刪除它。