2013-02-10 34 views
2

我在Hadoop上實現PageRank算法,正如標題所示我想出瞭如下因素的錯誤,而試圖執行該代碼:從圖中關鍵Hadoop執行錯誤:類型在映射鍵中不匹配:expected org.apache.hadoop.io.Text,收到org.apache.hadoop.io.LongWritable

類型不匹配:預計org.apache.hadoop.io.Text,收到org.apache.hadoop.io.LongWritable

在我的輸入文件中,我將圖形節點ID作爲關鍵字和一些關於它們的信息存儲爲值。我的輸入文件的格式如下:

1 \ t 3.4,2,5,6,67

4 \ t 4.2,77,2,7,83

... ..

試圖瞭解錯誤說什麼我嘗試使用LongWritable作爲我的主變量類型,如下面的代碼中所示。這意味着我有:

地圖< LongWritable,LongWritable,LongWritable,LongWritable>

減少< LongWritable,LongWritable,LongWritable,LongWritable>

而且,我也試過:

地圖<文本,文本,文本,文本>

減少<文本,文本,文本,文本>

也:

地圖< LongWritable,文本,LongWritable,文字>

減少< LongWritable,文本,LongWritable,文字>

,我總是拿出同樣的錯誤。我想我很難理解錯誤中期望和接受的方式。這是否意味着我的地圖函數期望從我的輸入文件中獲得LongWritable,並且它具有Text?我使用的輸入文件格式或變量類型有問題嗎?

下面是完整的代碼,你能告訴我什麼改變在哪裏?:

import java.io.IOException; 
import java.util.*; 
import java.util.regex.Matcher; 
import java.util.regex.Pattern; 
import java.lang.Object.*; 

import org.apache.commons.cli.ParseException; 
import org.apache.commons.lang.StringUtils; 
import org.apache.commons.configuration.Configuration; 
import org.apache.hadoop.security.Credentials; 
import org.apache.log4j.*; 
import org.apache.commons.logging.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.DoubleWritable; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 
import org.apache.hadoop.util.GenericOptionsParser; 



public class Pagerank 
{ 


public static class PRMap extends Mapper<LongWritable, LongWritable, LongWritable, LongWritable> 
{ 

    public void map(LongWritable lineNum, LongWritable line, OutputCollector<LongWritable, LongWritable> outputCollector, Reporter reporter) throws IOException, InterruptedException 
    { 
     if (line.toString().length() == 0) { 
      return; 
     } 

     Text key = new Text(); 
     Text value = new Text(); 
     LongWritable valuel = new LongWritable(); 
     StringTokenizer spline = new StringTokenizer(line.toString(),"\t"); 
     key.set(spline.nextToken()); 
     value.set(spline.nextToken()); 

     valuel.set(Long.parseLong(value.toString())); 
     outputCollector.collect(lineNum,valuel); 


     String info = value.toString(); 
     String splitter[] = info.split(","); 

     if(splitter.length >= 3) 
     { 
      float f = Float.parseFloat(splitter[0]); 
      float pagerank = f/(splitter.length - 2); 

      for(int i=2;i<splitter.length;i++) 
      { 
       LongWritable key2 = new LongWritable(); 
       LongWritable value2 = new LongWritable(); 
       long l; 

       l = Long.parseLong(splitter[i]); 
       key2.set(l); 
       //key2.set(splitter[i]); 
       value2.set((long)f); 

       outputCollector.collect(key2, value2); 
      } 
     } 
    } 
} 

public static class PRReduce extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable> 
{ 
    private Text result = new Text(); 
    public void reduce(LongWritable key, Iterator<LongWritable> values,OutputCollector<LongWritable, LongWritable> results, Reporter reporter) throws IOException, InterruptedException 
    { 

     float pagerank = 0; 
     String allinone = ","; 
     while(values.hasNext()) 
     { 
      LongWritable temp = values.next(); 
      String converted = temp.toString(); 
      String[] splitted = converted.split(","); 

      if(splitted.length > 1) 
      {     
       for(int i=1;i<splitted.length;i++) 
       { 
        allinone = allinone.concat(splitted[i]); 
        if(i != splitted.length - 1) 
         allinone = allinone.concat(","); 
       } 
      } 
      else 
      { 
       float f = Float.parseFloat(splitted[0]); 
       pagerank = pagerank + f; 
      } 
     } 
     String last = Float.toString(pagerank); 
     last = last.concat(allinone); 

     LongWritable value = new LongWritable(); 
     value.set(Long.parseLong(last)); 

     results.collect(key, value); 
    }  
} 



public static void main(String[] args) throws Exception 
{  


    org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); 

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 
    if (otherArgs.length != 2) { 
     System.err.println("Usage: wordcount <in> <out>"); 
     System.exit(2); 
    } 

    Job job = new Job(conf, "pagerank_itr0"); 

    job.setJarByClass(Pagerank.class);  
    job.setMapperClass(Pagerank.PRMap.class);  
    job.setReducerClass(Pagerank.PRReduce.class);  


    job.setOutputKeyClass(LongWritable.class);    
    job.setOutputValueClass(LongWritable.class);    
    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
    job.waitForCompletion(true); 

} 
} 

回答

3

您未在作業配置設置映射輸出類。 嘗試使用以下方法設置作業中的鍵和值類:

setMapOutputKeyClass();

setMapOutputValueClass();

相關問題