2014-12-22 64 views
0

我試圖梳理出的價值秩序,並試圖運行兩個 映射器和減速,但第二個作業開始時失敗,並說:排序中的價值秩序的Hadoop

14/12/21 18點43分35秒ERROR security.UserGroupInformation: PriviledgedActionException爲:Cloudera的(AUTH:SIMPLE) 原因:org.apache.hadoop.mapred.FileAlreadyExistsException:輸出 目錄 HDFS://localhost.localdomain:8020 /用戶/ cloudera/wordcount/output 已存在線程「主」中的異常 org.apache.hadoop.mapred.FileAlreadyExistsException:輸出目錄 HDFS://localhost.localdomain:8020 /用戶/ Cloudera公司/單詞計數/輸出 已經存在

這裏是我的代碼:

package org.myorg; 

import java.io.IOException; 
import java.util.*; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.mapreduce.Job; 

public class WordCount { 

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { 

    private final static IntWritable one = new IntWritable(1); 
    private Text word = new Text(); 

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
     String line = value.toString(); 
     StringTokenizer tokenizer = new StringTokenizer(line); 
     while (tokenizer.hasMoreTokens()) { 
      word.set(tokenizer.nextToken()); 
      output.collect(word, one); 
     } 
    } 
} 

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { 

    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
     int sum = 0; 
     while (values.hasNext()) { 
      sum += values.next().get(); 
     } 
     output.collect(key, new IntWritable(sum)); 
    } 
} 

class Map1 extends MapReduceBase implements Mapper<Object, Text, IntWritable, Text> { 

    public void map(Object key, Text value, OutputCollector<IntWritable, Text> collector, Reporter arg3) throws IOException { 
     String line = value.toString(); 
     StringTokenizer stringTokenizer = new StringTokenizer(line); 
     { 
      int number = 999; 
      String word = "empty"; 

      if (stringTokenizer.hasMoreTokens()) { 
       String str0 = stringTokenizer.nextToken(); 
       word = str0.trim(); 
      } 

      if (stringTokenizer.hasMoreElements()) { 
       String str1 = stringTokenizer.nextToken(); 
       number = Integer.parseInt(str1.trim()); 
      } 
      collector.collect(new IntWritable(number), new Text(word)); 
     } 

    } 

} 

class Reduce1 extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> { 

    public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable, Text> arg2, Reporter arg3) throws IOException { 
     while ((values.hasNext())) { 
      arg2.collect(key, values.next()); 
     } 
    } 

} 

public static void main(String[] args) throws Exception { 
    JobConf conf = new JobConf(WordCount.class); 
    conf.setJobName("wordCount"); 

    conf.setOutputKeyClass(Text.class); 
    conf.setOutputValueClass(IntWritable.class); 

    conf.setMapperClass(Map.class); 
    conf.setCombinerClass(Reduce.class); 
    conf.setReducerClass(Reduce.class); 

    conf.setInputFormat(TextInputFormat.class); 
    conf.setOutputFormat(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(conf, new Path(args[0])); 
    FileOutputFormat.setOutputPath(conf, new Path("wordcount/output")); 

//JobClient.runJob(conf); 
    //------------------------------------------------------------------ 
    JobConf conf2 = new JobConf(WordCount.class); 
    conf2.setJobName("WordCount1"); 

    conf2.setOutputKeyClass(Text.class); 
    conf2.setOutputValueClass(IntWritable.class); 

    conf2.setMapperClass(Map1.class); 
    conf2.setCombinerClass(Reduce1.class); 
    conf2.setReducerClass(Reduce1.class); 

    conf2.setInputFormat(TextInputFormat.class); 
    conf2.setOutputFormat(TextOutputFormat.class); 

    FileInputFormat.setInputPaths(conf2, new Path("wordcount/output/part-00000")); 
    FileOutputFormat.setOutputPath(conf2, new Path(args[1])); 

    Job job1 = new Job(conf); 
    Job job2 = new Job(conf2); 

    job1.submit(); 
if (job1.waitForCompletion(true)) { 
    job2.submit(); 
    job2.waitForCompletion(true); 
    } 

    } 
} 

我試圖改變路徑幾次甚至創建一個名爲tmp的新直接,但沒有運氣。

當前錯誤消息:

14/12/21 19:58:12 INFO mapred.JobClient: Running job: job_201412211623_0042 
    14/12/21 19:58:13 INFO mapred.JobClient: map 0% reduce 0% 
    14/12/21 19:58:35 INFO mapred.JobClient: Task Id :  attempt_201412211623_0042_m_000001_0, Status : FAILED 
    java.lang.RuntimeException: Error in configuring object 
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) 
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:413) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) 
    at org.apache.hadoop.mapred.Child.main(Child.java:262) 
Caused by: java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.ja 
14/12/21 19:58:35 INFO mapred.JobClient: Task Id : attempt_201412211623_0042_m_000000_0, Status : FAILED 
java.lang.RuntimeException: Error in configuring object 
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) 
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:413) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438) 
    at org.apache.hadoop.mapred.Child.main(Child.java:262) 
Caused by: java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.ja 
14/12/21 19:58:54 INFO mapred.JobClient: Task Id : attempt_201412211623_0042_m_000001_1, Status : FAILED 
java.lang.RuntimeException: Error in configuring object 
+0

我已經嘗試改變路徑afew時間甚至創建一個新的直接調用tmp,但沒有運氣 – Roland

+0

你如何傳遞輸入和輸出路徑? – Sandy

+0

對不起,我沒有得到你的意思是在這個hadoop真的很糟糕的東西,真的很糟糕>< – Roland

回答

0

我建議你使用新的API

這個例子是基於新的API

public class ChainJobs extends Configured implements Tool { 

private static final String OUTPUT_PATH = "intermediate_output"; 

@Override 
public int run(String[] args) throws Exception { 
    /* 
    * Job 1 
    */ 
    Configuration conf = getConf(); 
    FileSystem fs = FileSystem.get(conf); 
    Job job = new Job(conf, "Job1"); 
    job.setJarByClass(ChainJobs.class); 

    job.setMapperClass(MyMapper1.class); 
    job.setReducerClass(MyReducer1.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 

    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    TextInputFormat.addInputPath(job, new Path(args[0])); 
    TextOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); 

    job.waitForCompletion(true); 

    /* 
    * Job 2 
    */ 
    Configuration conf2 = getConf(); 
    Job job2 = new Job(conf2, "Job 2"); 
    job2.setJarByClass(ChainJobs.class); 

    job2.setMapperClass(MyMapper2.class); 
    job2.setReducerClass(MyReducer2.class); 

    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(Text.class); 

    job2.setInputFormatClass(TextInputFormat.class); 
    job2.setOutputFormatClass(TextOutputFormat.class); 

    TextInputFormat.addInputPath(job2, new Path(OUTPUT_PATH)); 
    TextOutputFormat.setOutputPath(job2, new Path(args[1])); 

    return job2.waitForCompletion(true) ? 0 : 1; 
} 

private static final String OUTPUT_PATH = "intermediate_output";爲第一個作業輸出定義,它將作爲第二個作業的輸入。

參考this

希望這有助於。

+0

希望鏈接幫助你:) –

0

所有的錯誤說是wordcount/output目錄已經存在。我看到你對第一個MR作業的輸出目錄的值(FileOutputFormat.setOutputPath(conf, new Path("wordcount/output"));)進行了硬編碼。

如果您已經存在該目錄(output),那麼該作業將失敗,因爲它會阻止您重寫某些內容。嘗試刪除該目錄並使用新目錄運行作業。

+0

像這樣的目錄 Job1: FileInputFormat.setInputPaths(conf,new Path(args [0])); FileOutputFormat。setOutputPath(conf,new Path(「tmp」)); Job2: FileInputFormat.setInputPaths(conf2,new Path(「tmp/part-00000」)); FileOutputFormat.setOutputPath(conf2,new Path(args [1])); 我得到一個錯誤,說:java.lang.RuntimeException:配置對象時出錯 – Roland

+0

FileOutputFormat.setOutputPath(conf2,new Path(args [1]));'args [1]也與'tmp相同'在上面的情況下...... –

+0

所以我應該做一個完全新的目錄像temp/tempfiles?並在那裏存儲第一個輸出? – Roland

0

您的兩個不同的縮減工作可能正試圖在同一位置寫入。在hdfs中,我們無法更新或覆蓋。如果要再次在同一位置寫入,則需要刪除退出的文件,目錄位置。

以下是一些有用的參考

chaining-multiple-mapreduce-jobs-in-hadoop

job chaining