2014-01-24 45 views
0

我試圖找出使用Hadoop的文本中最常用的單詞。 Hadoop是一個允許跨計算機集羣分佈式處理大型數據集的框架。文本中的常見單詞

我知道這可以通過使用Unix命令輕鬆完成:job: sort -n -k2 txtname | tail。但是這並不能擴展到大型數據集。所以我試圖解決問題,然後結合結果。

這裏是我WordCount類:

import java.util.Arrays; 
    import org.apache.commons.lang.StringUtils; 
    import org.apache.hadoop.conf.Configuration; 
    import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.io.IntWritable; 
    import org.apache.hadoop.io.Text; 
    import org.apache.hadoop.mapreduce.Job; 
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

    public class WordCount { 
     public static void runJob(String[] input, String output) throws Exception { 
     Configuration conf = new Configuration(); 
     Job job = new Job(conf); 
     job.setJarByClass(WordCount.class); 

     job.setMapperClass(TokenizerMapper.class); 
     job.setReducerClass(IntSumReducer.class); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(IntWritable.class); 

     Path outputPath = new Path(output); 
     FileInputFormat.setInputPaths(job, StringUtils.join(input, ",")); 
     FileOutputFormat.setOutputPath(job, outputPath); 
     outputPath.getFileSystem(conf).delete(outputPath,true); 
     job.waitForCompletion(true); 
     } 

     public static void main(String[] args) throws Exception { 
     runJob(Arrays.copyOfRange(args, 0, args.length-1), args[args.length-1]); 
     } 
    } 

我明白,我需要一個額外的任務並行工作與地圖減少字數類。

這裏是我TokenizerMapper類:

import java.io.IOException; 
    import java.util.StringTokenizer; 
    import org.apache.hadoop.io.IntWritable; 
    import org.apache.hadoop.io.Text; 
    import org.apache.hadoop.mapreduce.Mapper; 

    public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { 
     private final IntWritable one = new IntWritable(1); 
     private Text data = new Text(); 

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
     StringTokenizer itr = new StringTokenizer(value.toString(), "-- \t\n\r\f,.:;?![]'\""); 

     while (itr.hasMoreTokens()) { 
      data.set(itr.nextToken().toLowerCase()); 
      context.write(data, one); 
     } 
     } 
    } 

這裏是我IntSumReducer類:

import java.io.IOException; 
    import java.util.Iterator; 
    import org.apache.hadoop.io.IntWritable; 
    import org.apache.hadoop.io.Text; 
    import org.apache.hadoop.mapreduce.Reducer; 

    public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { 
     private IntWritable result = new IntWritable(); 
     public void reduce(Text key, Iterable<IntWritable> values, Context context) 

     throws IOException, InterruptedException { 
     int sum = 0; 

     for (IntWritable value : values) { 
      // TODO: complete code here 
      sum+=value.get(); 
     } 

     result.set(sum); 

     // TODO: complete code here 

     if (sum>3) { 
      context.write(key,result); 
     } 
     } 
    } 

什麼,我需要做的是定義另一個Map和Reduce類,將並行工作與此電流一。最出現的單詞會出現,這是我對減少類到目前爲止:

import java.io.IOException; 
import java.util.Iterator; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.Reducer.Context; 

public class reducer2 extends Reducer<Text, IntWritable, Text, IntWritable> { 
    int max_sum =0; 
    Text max_occured_key = new Text(); 

    private IntWritable result = new IntWritable(); 

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 

    throws IOException, InterruptedException { 
    int sum = 0; 

    for (IntWritable value : values) { 
     // TODO: complete code here 
     sum+=value.get(); 
    } 

    if (sum >max_sum) { 
     max_sum = sum; 
     max_occured_key.set(key); 
    } 

    context.write(max_occured_key, new IntWritable(max_sum)); 

    //result.set(sum); 

    // TODO: complete code here 

    /* 
    if (sum>3) { 
     context.write(key,result); 
    } 
    */ 
    } 

    protected void cleanup(Context context) throws IOException, InterruptedException { 
    context.write(max_occured_key, new IntWritable(max_sum)); 
    } 
} 

mapper2代碼:

import java.io.IOException; 
import java.util.StringTokenizer; 

import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Mapper.Context; 

public class mapper2 { 
    private final IntWritable one = new IntWritable(1); 
    private Text data = new Text(); 

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
    StringTokenizer itr = new StringTokenizer(value.toString(), "-- \t\n\r\f,.:;?![]'\""); 
    int count =0; 

    while (itr.hasMoreTokens()) {  
     //data.set(itr.nextToken().toLowerCase());   
     context.write(data, one); 
    } 
    } 
} 

我還編輯了WordCount類,這樣兩項工作能同時運行:

import java.util.Arrays; 
import org.apache.commons.lang.StringUtils; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class WordCount { 
    public static void runJob(String[] input, String output) throws Exception { 
    Configuration conf = new Configuration(); 
    Job job = new Job(conf); 
    job.setJarByClass(WordCount.class); 

    job.setMapperClass(TokenizerMapper.class); 
    job.setReducerClass(IntSumReducer.class); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(IntWritable.class); 

    Path outputPath = new Path(output); 
    FileInputFormat.setInputPaths(job, StringUtils.join(input, ",")); 
    FileOutputFormat.setOutputPath(job, outputPath); 
    outputPath.getFileSystem(conf).delete(outputPath,true); 
    job.waitForCompletion(true); 

    Job job2 = new Job(conf); 
    job2.setJarByClass(WordCount.class); 

    job2.setMapperClass(TokenizerMapper.class); 
    job2.setReducerClass(reducer2.class); 
    job2.setMapOutputKeyClass(Text.class); 
    job2.setMapOutputValueClass(IntWritable.class); 

    Path outputPath2 = new Path(output); 
    FileInputFormat.setInputPaths(job, StringUtils.join(input, ",")); 
    FileOutputFormat.setOutputPath(job, outputPath); 
    outputPath.getFileSystem(conf).delete(outputPath,true); 
    job.waitForCompletion(true); 
    } 

    public static void main(String[] args) throws Exception { 
     runJob(Arrays.copyOfRange(args, 0, args.length-1), args[args.length-1]); 
    } 
} 

如何在使用hadoop的文本中找出最常見的單詞?

+0

你的問題是什麼? –

+0

我試圖找出使用hadoop的文本中最常用的單詞並將其打印出來 –

回答

0

這是規範的字數問題,你可以谷歌和找到任何數量的基本字數的解決方案。然後,您只需再執行一個步驟:返回計數最多的單詞。

如何做到這一點?

如果數據量不是很大,並且您可以負擔得起使用一個reducer,那麼將reducers的數量設置爲1.在您的reduce內部保留一個局部變量,該變量記住哪個組(即單詞)具有/具有最高的計數。然後將結果寫入HDFS中的文件。

如果數據量排除使用單個reducer,那麼您只需要在上面提到的第一個步驟之外增加一個步驟:您需要找到所有reducer中的最高計數。你可以通過一個全局計數器或者通過將每個單獨的最大單詞寫入他們自己的(小)文件中的hdfs並且具有後處理步驟(可能是一個Linux腳本)來解析並獲得最大值的最大值。或者你可以有另一個地圖/減少工作找到它 - 但這對於那個小型/簡單的操作來說有點矯枉過正。

0

我想你錯了sort -n -k2不會在這種情況下的規模工作。 WordCount可能永遠不會輸出數千或數萬個單詞的順序。這僅僅是由於大多數自然語言的基數。因此,即使您有10 PB的數據,它仍然會被提取到最多10,000或20,000個字(雖然計數很高)。

首先,讓您的WordCount作業在數據上運行。然後,用一些bash把頂部的N拉出來。

hadoop fs -cat /output/of/wordcount/part* | sort -n -k2 | tail -n20 

如果由於某種原因,你得到一噸的話出單詞計數(即,你不這樣做自然語言)...

有兩臺MapReduce作業:

  1. 字計數:統計所有的話(幾乎按示例)
  2. TOPN:該發現的東西頂N(A MapReduce工作這裏有一些例子:source code,blog post

讓WordCount的輸出寫入HDFS。然後,讓TopN讀取該輸出。這就是所謂的工作鏈,有很多方法來解決這個問題:oozie,bash腳本,從你的驅動程序發射兩個工作,等等。

你需要兩個工作的原因是你正在做兩個聚合:一個是字數,第二個是topN。通常在MapReduce中,每個聚合需要它自己的MapReduce作業。

相關問題