2013-08-07 27 views
1

我是hadoop的新手,我正在嘗試src/examples中的wordcount/secondsort示例。hadoop:在完成地圖之前在flush map輸出和完成溢出之間發生的減少

單詞計數測試環境:

輸入:

file01.txt

file02.txt

secondsort測試環境:

輸入:

sample01.txt

sample02.txt

這意味着兩個測試都將有2個路徑來處理。 我打印一些日誌信息,試圖瞭解map/reduce的過程。

之間有什麼開始圖輸出的沖洗成品溢出0: 的單詞計數程序有另外兩個reduce任務前的最後一個減少而 的secondsort程序只是將減少一次,它的完成。 由於這些程序是如此「小」,我不認爲io.sort.mb/io.sort.refactor會影響這一點。

有人可以解釋一下嗎?

感謝您耐心等待我的Englisth和長日誌。

這些日誌信息(我剪了一些無用的信息,使之短):

wordcount log: 

[[email protected] ~]$ hadoop jar test.jar com.abc.example.test wordcount output 
13/08/07 18:14:05 INFO mapred.FileInputFormat: Total input paths to process : 2 
13/08/07 18:14:06 INFO mapred.JobClient: Running job: job_local_0001 
13/08/07 18:14:06 INFO util.ProcessTree: setsid exited with exit code 0 
... 
13/08/07 18:14:06 INFO mapred.MapTask: numReduceTasks: 1 
13/08/07 18:14:06 INFO mapred.MapTask: io.sort.mb = 100 
13/08/07 18:14:06 INFO mapred.MapTask: data buffer = 79691776/99614720 
13/08/07 18:14:06 INFO mapred.MapTask: record buffer = 262144/327680 
Mapper: 0 | Hello Hadoop GoodBye Hadoop 
13/08/07 18:14:06 INFO mapred.MapTask: **Starting flush of map output** 
Reduce: GoodBye 
Reduce: GoodBye | 1 
Reduce: Hadoop 
Reduce: Hadoop | 1 
Reduce: Hadoop | 1 
Reduce: Hello 
Reduce: Hello | 1 
13/08/07 18:14:06 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 
13/08/07 18:14:06 INFO mapred.LocalJobRunner: hdfs://localhost:8020/user/hadoop/wordcount/file02.txt:0+28 
13/08/07 18:14:06 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done. 
13/08/07 18:14:06 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
13/08/07 18:14:06 INFO mapred.MapTask: numReduceTasks: 1 
13/08/07 18:14:06 INFO mapred.MapTask: io.sort.mb = 100 
13/08/07 18:14:06 INFO mapred.MapTask: data buffer = 79691776/99614720 
13/08/07 18:14:06 INFO mapred.MapTask: record buffer = 262144/327680 
13/08/07 18:14:06 INFO mapred.MapTask: **Starting flush of map output** 
Reduce: Bye 
Reduce: Bye | 1 
Reduce: Hello 
Reduce: Hello | 1 
Reduce: world 
Reduce: world | 1 
Reduce: world | 1 
13/08/07 18:14:06 INFO mapred.MapTask: **Finished spill 0** 
13/08/07 18:14:06 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 
13/08/07 18:14:06 INFO mapred.LocalJobRunner: hdfs://localhost:8020/user/hadoop/wordcount/file01.txt:0+22 
13/08/07 18:14:06 INFO mapred.Task: Task 'attempt_local_0001_m_000001_0' done. 
13/08/07 18:14:06 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
13/08/07 18:14:06 INFO mapred.LocalJobRunner: 
13/08/07 18:14:06 INFO mapred.Merger: Merging 2 sorted segments 
13/08/07 18:14:06 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 77 bytes 
13/08/07 18:14:06 INFO mapred.LocalJobRunner: 
Reduce: Bye 
Reduce: Bye | 1 
Reduce: GoodBye 
Reduce: GoodBye | 1 
Reduce: Hadoop 
Reduce: Hadoop | 2 
Reduce: Hello 
Reduce: Hello | 1 
Reduce: Hello | 1 
Reduce: world 
Reduce: world | 2 
13/08/07 18:14:06 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 
... 
13/08/07 18:14:07 INFO mapred.JobClient:  Reduce input groups=5 
13/08/07 18:14:07 INFO mapred.JobClient:  Combine output records=6 
13/08/07 18:14:07 INFO mapred.JobClient:  Physical memory (bytes) snapshot=0 
13/08/07 18:14:07 INFO mapred.JobClient:  Reduce output records=5 
13/08/07 18:14:07 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=0 
13/08/07 18:14:07 INFO mapred.JobClient:  Map output records=8 

secondsort log info: 

[[email protected] ~]$ hadoop jar example.jar com.abc.example.example secondsort output 
13/08/07 17:00:11 INFO input.FileInputFormat: Total input paths to process : 2 
13/08/07 17:00:11 WARN snappy.LoadSnappy: Snappy native library not loaded 
13/08/07 17:00:12 INFO mapred.JobClient: Running job: job_local_0001 
13/08/07 17:00:12 INFO util.ProcessTree: setsid exited with exit code 0 
13/08/07 17:00:12 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
13/08/07 17:00:12 INFO mapred.MapTask: io.sort.mb = 100 
13/08/07 17:00:12 INFO mapred.MapTask: data buffer = 79691776/99614720 
13/08/07 17:00:12 INFO mapred.MapTask: record buffer = 262144/327680 
Map: 0 | 5 49 
Map: 5 | 9 57 
Map: 10 | 19 46 
Map: 16 | 3 21 
Map: 21 | 9 48 
Map: 26 | 7 57 
... 
13/08/07 17:00:12 INFO mapred.MapTask: **Starting flush of map output** 
13/08/07 17:00:12 INFO mapred.MapTask: **Finished spill 0** 
13/08/07 17:00:12 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 
13/08/07 17:00:12 INFO mapred.LocalJobRunner: 
13/08/07 17:00:12 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done. 
13/08/07 17:00:12 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
13/08/07 17:00:12 INFO mapred.MapTask: io.sort.mb = 100 
13/08/07 17:00:12 INFO mapred.MapTask: data buffer = 79691776/99614720 
13/08/07 17:00:12 INFO mapred.MapTask: record buffer = 262144/327680 
Map: 0 | 20 21 
Map: 6 | 50 51 
Map: 12 | 50 52 
Map: 18 | 50 53 
Map: 24 | 50 54 
... 
13/08/07 17:00:12 INFO mapred.MapTask: **Starting flush of map output** 
13/08/07 17:00:12 INFO mapred.MapTask: **Finished spill 0** 
13/08/07 17:00:12 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 
13/08/07 17:00:12 INFO mapred.LocalJobRunner: 
13/08/07 17:00:12 INFO mapred.Task: Task 'attempt_local_0001_m_000001_0' done. 
13/08/07 17:00:12 INFO mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
13/08/07 17:00:12 INFO mapred.LocalJobRunner: 
13/08/07 17:00:12 INFO mapred.Merger: Merging 2 sorted segments 
13/08/07 17:00:12 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 1292 bytes 
13/08/07 17:00:12 INFO mapred.LocalJobRunner: 
Reduce: 0:35 ----------------- 
Reduce: 0:35 | 35 
Reduce: 0:54 ----------------- 
... 
13/08/07 17:00:12 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 
13/08/07 17:00:12 INFO mapred.LocalJobRunner: 
13/08/07 17:00:12 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now 
13/08/07 17:00:12 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to output 
13/08/07 17:00:12 INFO mapred.LocalJobRunner: reduce > reduce 
13/08/07 17:00:12 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done. 
13/08/07 17:00:13 INFO mapred.JobClient: map 100% reduce 100% 
13/08/07 17:00:13 INFO mapred.JobClient: Job complete: job_local_0001 
13/08/07 17:00:13 INFO mapred.JobClient: Counters: 22 
13/08/07 17:00:13 INFO mapred.JobClient: File Output Format Counters 
13/08/07 17:00:13 INFO mapred.JobClient:  Bytes Written=4787 
... 
13/08/07 17:00:13 INFO mapred.JobClient:  SPLIT_RAW_BYTES=236 
13/08/07 17:00:13 INFO mapred.JobClient:  Reduce input records=92 

PS:在main()S下別人退房。

單詞計數:

public static void main(String[] args) throws Exception { 
    JobConf conf = new JobConf(test.class); 
    conf.setJobName("wordcount"); 

    conf.setOutputKeyClass(Text.class); 
    conf.setOutputValueClass(IntWritable.class); 

    conf.setMapperClass(Map.class); 
    conf.setCombinerClass(Reduce.class); 
    conf.setReducerClass(Reduce.class); 

    conf.setInputFormat(TextInputFormat.class); 
    conf.setOutputFormat(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(conf, new Path(args[0])); 
    FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

    JobClient.runJob(conf); 
    } 

secondsort:

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException 
    { 
     Configuration conf = new Configuration(); 
     Job job = new Job(conf, "secondarysort"); 
     job.setJarByClass(example.class); 
     job.setMapperClass(Map.class); 
     job.setReducerClass(Reduce.class); 
     job.setPartitionerClass(FirstPartitioner.class); 
     job.setGroupingComparatorClass(GroupingComparator.class); 

     job.setMapOutputKeyClass(IntPair.class); 
     job.setMapOutputValueClass(IntWritable.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(IntWritable.class); 

     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     FileInputFormat.setInputPaths(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 

回答

1

聯合輸出記錄= 6

這一切:reduce函數既用作組合器和減速器。所以你看到的是從組合器輸出。當輸出溢出時,組合器(有時)會被調用。

我認爲你應該已經添加了你的代碼,至少在main()中的部分向我們展示了你的工作是如何建立的。這會讓你更容易回答你的問題。

+0

如果你不想要任何組合,請從你的主目錄中刪除以下行:'conf.setCombinerClass(Reduce.class);' – DDW

+0

如果這回答你的問題,請upvote .. – DDW

-1

我覺得行,如

減少:再見

減少:再見| 1

是你的源代碼中的println(...),你需要檢查源代碼。

相關問題