2013-04-30 191 views
-2

我已經爲單機羣集編寫mapreduce java代碼而不使用Tool,它可以在多節點羣集上工作,還是需要進行更改? 下面的代碼標記化的字符串並計算每個文本文件單節點羣集在多節點羣集上工作?

public class tr 
    { 
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,Text,IntWritable> 
    { 
     Text word=new Text(); 
     IntWritable one=new IntWritable(1); 
       String imptoken; 
     public static List<String> stopwords=new ArrayList<String>(); 
     public void map(LongWritable key,Text value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException 
     { 
         addwords(); 
       String line=value.toString(); 
         line=line.replaceAll("[^A-Za-z]"," ").toLowerCase(); 
         StringTokenizer st=new StringTokenizer(line); 
      while(st.hasMoreTokens()) 
     { 
       imptoken=st.nextToken(); 
          if(stopwords.contains(imptoken)) 
       { 

       }    
       else 
       { 
        word.set(imptoken); 
        output.collect(word, one); 
       }        
        } 
    } 
      public void addwords() throws IOException 
     { 
    FileSystem fs = FileSystem.get(new Configuration()); 
    Path stop=new Path("/user/hduser/stopword.txt"); 
    BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(stop))); 
    String stopword=br.readLine(); 
    while(stopword!=null) 
    { 
     stopwords.add(stopword); 
     stopword=br.readLine(); 
    } 

     } 

} 
public static class Reduce extends MapReduceBase implements Reducer<Text,IntWritable, Text, IntWritable> 
{ 
    public void reduce(Text key,Iterator<IntWritable> value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException 
    { 
     int sum=0; 
     while(value.hasNext()) 
     { 
      sum=sum+value.next().get(); 
     } 
        /* Path paths=new Path("/user/hduser/input1/"); 
     FileSystem fs=FileSystem.get(new Configuration()); 
     FileStatus[] status = fs.listStatus(paths); 
        Path[] list = FileUtil.stat2Paths(status); 
        String keystr=key.toString(); 
        for(Path file : list) 
        { 
         BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(file))); 
       String word=br.readLine(); 
       while(word!=null) 
       { 
       if(word.equals(keystr)) 
       { 
        sum=0; 
       } 
       word=br.readLine(); 
       } 

        }*/ 

        output.collect(key, new IntWritable(sum)); 
      }  
    } 

public static void main(String args[]) throws IOException 
{    
    FileSystem fs = FileSystem.get(new Configuration()); 
    Path[] paths = new Path[args.length]; 
    for (int i = 0; i < paths.length; i++) 
    { 
     paths[i] = new Path(args[i]); 
    } 

    FileStatus[] status = fs.listStatus(paths); 
      Path[] listedPaths = FileUtil.stat2Paths(status); 

    FSDataInputStream in = null; 
    for (Path p : listedPaths) 
    { 
     JobConf conf = new JobConf(tr.class); 
      conf.setJobName("tr"); 

      conf.setOutputKeyClass(Text.class); 
      conf.setOutputValueClass(IntWritable.class); 

      conf.setMapperClass(Map.class); 
      conf.setCombinerClass(Reduce.class); 
      conf.setReducerClass(Reduce.class); 

      conf.setInputFormat(TextInputFormat.class); 
        conf.setOutputFormat(TextOutputFormat.class); 

      String name=p.getName(); 
      String absolutepath=p.getParent().toString()+"/"+name; 

      FileInputFormat.setInputPaths(conf, new Path(absolutepath)); 
        FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

        JobClient.runJob(conf); 

        Path local=new Path("/home/hduser/meproj/projectfiles/"); 
        Path source=new Path(args[1]+"/"+"part-00000"); 

        fs.copyToLocalFile(source, local); 

        File file=new File("/home/hduser/meproj/projectfiles/part-00000"); 
        file.renameTo(new File("/home/hduser/meproj/projectfiles/"+name)); 
        fs.delete(new Path(args[1]), true); 
    } 
} 


} 
+0

它應該這樣做,除非你已經明確地編碼它,否則它不會。爲什麼你使用舊的hadoop API有什麼特別的原因嗎? – Quetzalcoatl 2013-04-30 16:17:42

+0

我是hadoop的新手,我還沒有正確理解MapReduce編程,我不知道如何在語料庫中爲每個文件運行MapReduce程序,因此可以告訴我在單節點和多節點集羣上爲每個文件實施MapReduce程序的替代方法 – user2200278 2013-04-30 16:32:26

回答

1

詞頻當你寫使用它會爲所有羣集設置工作,除非你是專門做一些突破,像在本地工作的Hadoop程序文件在一臺機器上。

您正以獨立於設置的方式(您應該這樣做)在Mapper和Reducer中完成工作,因此它應該可以在任何地方工作。

這與您的問題無關,但您不應該循環遍歷文件並在每個路徑上運行獨立的作業。真的,你應該對所有這些人進行一次工作。您可以將所有這些單獨的路徑放在同一個文件夾中,並將該文件夾指定爲輸入。或者你可以在多條路徑上運行hadoop(參見answer