2013-01-15 61 views
0

我試圖在本地使用MiniMRYarnCluster運行MR作業。 我用老的MapReduce(未紗)和MapReduce API V2 這東西可以在這裏找到:MiniMRYarnCluster,本地運行MR

<dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-mapreduce-client-jobclient</artifactId> 
      <version>2.0.0-cdh4.1.1</version> 
      <type>test-jar</type> 
      <scope>test</scope> 
     </dependency> 

這裏是日誌的一部分:

--127.0.1.1-58175-1358256748215, blockid: BP-1072059606-127.0.1.1-1358256746988:blk_6137856716359201843_1008, duration: 229871 
13/01/15 17:32:34 INFO localizer.LocalizedResource: Resource hdfs://localhost:50123/apps_staging_dir/ssa/.staging/job_1358256748507_0001/job.xml transitioned from DOWNLOADING to LOCALIZED 
13/01/15 17:32:34 INFO container.Container: Container container_1358256748507_0001_01_000001 transitioned from LOCALIZING to LOCALIZED 
13/01/15 17:32:34 INFO container.Container: Container container_1358256748507_0001_01_000001 transitioned from LOCALIZED to RUNNING 
13/01/15 17:32:34 INFO nodemanager.DefaultContainerExecutor: launchContainer: [bash, /home/ssa/devel/POIClusterMapreduceTest/ru.mrjob.poi.POIClusterMapreduceTest-localDir-nm-0_1/usercache/ssa/appcache/application_1358256748507_0001/container_1358256748507_0001_01_000001/default_container_executor.sh] 
13/01/15 17:32:34 WARN nodemanager.DefaultContainerExecutor: Exit code from task is : 1 
13/01/15 17:32:34 INFO nodemanager.ContainerExecutor: 
13/01/15 17:32:34 WARN launcher.ContainerLaunch: Container exited with a non-zero exit code 1 

這裏是一個例外:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/service/CompositeService 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClassCond(ClassLoader.java:631) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:615) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:283) 
    at java.net.URLClassLoader.access$000(URLClassLoader.java:58) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:197) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247) 
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.service.CompositeService 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:202) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:190) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:306) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:247) 
    ... 12 more 
Could not find the main class: org.apache.hadoop.mapreduce.v2.app.MRAppMaster. Program will exit. 

我用org.apache.hadoop.mapreduce.v2.TestMRJobs作爲我自己的測試基地。有沒有人遇到過這個問題?

這裏是我的代碼,它的抽象基類CI服務器或開發人員的機器上本地測試MR工作:

public abstract class AbstractClusterMapReduceTest { 

    private static final Log LOG = LogFactory.getLog(AbstractClusterMapReduceTest.class); 

    public static final String DEFAULT_LOG_CATALOG = "local-mr-logs"; 

    private static final int DEFAULT_NAMENODE_PORT = 50123; 
    private static final int ONE_DATANODE = 1; 

    private static final int DEFAULT_REDUCE_NUM_TASKS = 1; 
    private static final String SLASH = "/"; 
    private static final String DEFAULT_MR_INPUT_DATA_FILE = "mr-input-data-file"; 

    private MiniMRYarnCluster mrCluster; 
    private MiniDFSCluster dfsCluster; 

    /** Shitty code from base Cloudera example*/ 
    private static Path TEST_ROOT_DIR = new Path("target", 
      AbstractClusterMapReduceTest.class.getName() + "-tmpDir").makeQualified(getLocalFileSystem()); 
    static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar"); 


    private static FileSystem getLocalFileSystem(){ 
     try { 
      return FileSystem.getLocal(new Configuration()); 
     } catch (IOException e) { 
      throw new Error("Can't access local file system. MR cluster can't be started", e); 
     } 
    } 

    /** 
    * Always provide path to log catalog. 
    * Default is: ${project.build.directory}/{@link AbstractClusterMapReduceTest#DEFAULT_LOG_CATALOG} 
    * */ 
    protected String getPathToLogCatalog(){ 
     return getPathToOutputDirectory()+ SLASH + DEFAULT_LOG_CATALOG; 
    } 

    private String getPathToOutputDirectory(){ 
     return System.getProperty("project.build.directory"); 
    } 

    private void checkAppJar(){ 
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { 
      throw new Error("MRAppJar " + MiniMRYarnCluster.APPJAR+ " not found. Not running test."); 
     }else{ 
      LOG.info(MiniMRYarnCluster.APPJAR + " is at the right place. Can continue to setup Env..."); 
     } 
    } 

    public void setupEnv() throws IOException{ 
     checkAppJar(); 

     System.setProperty("hadoop.log.dir", getPathToLogCatalog()); 
     System.setProperty("javax.xml.parsers.SAXParserFactory", 
       "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"); 

     dfsCluster = buildMiniDFSCluster(); 
     //dfsCluster.getFileSystem().makeQualified(createPath(getHDFSPathToInputData())); 
     //dfsCluster.getFileSystem().makeQualified(createPath(getOutputPath())); 

     mrCluster = new MiniMRYarnCluster(this.getClass().getName(), 1); 
     Configuration conf = new Configuration(); 
     conf.set("fs.defaultFS", getFileSystem().getUri().toString()); // use HDFS 
     //conf.set(MRJobConfig.MR_AM_STAGING_DIR, getPathToOutputDirectory()+"/tmp-mapreduce"); 
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir"); 
     mrCluster.init(conf); 
     mrCluster.start(); 

     //Cloudera tricks :) 
     // Copy MRAppJar and make it private. TODO: FIXME. This is a hack to 
     // workaround the absent public discache. 
     getLocalFileSystem().copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR); 
     getLocalFileSystem().setPermission(APP_JAR, new FsPermission("700")); 
    } 

    public void tearDown() { 
     if (mrCluster != null) { 
      mrCluster.stop(); 
      mrCluster = null; 
     } 
     if (dfsCluster != null) { 
      dfsCluster.shutdown(); 
      dfsCluster = null; 
     } 
    } 

    public boolean createAndSubmitJob() throws IOException, ClassNotFoundException, InterruptedException{ 
     LOG.info("createAndSubmitJob: enter"); 
     checkAppJar(); 
     LOG.info("MRAppJar has been found. Can start to create Job"); 

     Configuration configuration = mrCluster.getConfig(); 
     configuration.set(MRConfig.MASTER_ADDRESS, "local"); 

     Job job = Job.getInstance(configuration); 
     job.setJobName(this.getClass().getSimpleName()+"-job"); 
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. 

     job.setJarByClass(getMRJobClass()); 
     job.setJobName(getMRJobClass().getSimpleName()); 

     job.setNumReduceTasks(getReduceNumTasks()); 

     job.setOutputKeyClass(getOutputKeyClass()); 
     job.setOutputValueClass(getOutputValueClass()); 

     job.setMapperClass(getMapperClass()); 
     job.setReducerClass(getReducerClass()); 

     job.setInputFormatClass(getInputFormat()); 
     job.setOutputFormatClass(getOutputFormat()); 

     FileInputFormat.setInputPaths(job, getHDFSPathToInputData()); 
     FileOutputFormat.setOutputPath(job, createPath(getOutputPath())); 

     job.setSpeculativeExecution(false); 
     job.setMaxMapAttempts(1); // speed up failures 

     LOG.info("Submitting job..."); 
     job.submit(); 
     LOG.info("Job has been submitted."); 

     String trackingUrl = job.getTrackingURL(); 
     String jobId = job.getJobID().toString(); 
     LOG.info("trackingUrl:" +trackingUrl); 
     LOG.info("jobId:" +jobId); 

     return job.waitForCompletion(true); 
    } 


    protected FileSystem getFileSystem() throws IOException { 
     return dfsCluster.getFileSystem(); 
    } 


    protected int getReduceNumTasks(){ 
     return DEFAULT_REDUCE_NUM_TASKS; 
    } 


    /** 
    * @return InputStream instance to file you want to run with your MR job 
    * */ 
    protected InputStream getInputStreamForInputData() { 
     return this.getClass().getClassLoader().getResourceAsStream(this.getClass().getSimpleName()+"/"+getInputDatasetName()); 
     //return getPathToOutputDirectory()+ SLASH + DEFAULT_INPUT_CATALOG+"/mr-input-data"; 
    } 

    protected String getHDFSPathToInputData() throws IOException{ 
     InputStream inputStream = getInputStreamForInputData(); 
     Path hdfsInputPath = new Path(DEFAULT_MR_INPUT_DATA_FILE); 
     FSDataOutputStream fsDataOutputStream = getFileSystem().create(hdfsInputPath); 
     copyStream(inputStream, fsDataOutputStream); 
     fsDataOutputStream.close(); 
     inputStream.close(); 

     return hdfsInputPath.toString(); 
    } 

    private void copyStream(InputStream input, OutputStream output) throws IOException { 
     byte[] buffer = new byte[1024]; // Adjust if you want 
     int bytesRead; 
     while ((bytesRead = input.read(buffer)) != -1) 
     { 
      output.write(buffer, 0, bytesRead); 
     } 
    } 

    /** 
    * Dataset should be placed in resources/ConcreteClusterMapReduceTest 
    * @return a name of a file from catalog. 
    * */ 
    protected abstract String getInputDatasetName(); 

    /** 
    * @return path reducer output 
    * default is: @{link AbstractClusterMapReduceTest#DEFAULT_OUTPUT_CATALOG} 
    * */ 
    protected String getOutputPath(){ 
     return "mr-data-output"; 
    } 

    /** 
    * Creates @{link Path} using absolute path to some FS resource 
    * @return new Path instance. 
    * */ 
    protected Path createPath(String pathToFSResource){ 
     return new Path(pathToFSResource); 
    } 

    /** 
    * Builds new instance of MiniDFSCluster 
    * Default: @{link DEFAULT_NAMENODE_PORT}, @{link DEFAULT_NAMENODE_PORT} 
    * @return MiniDFSCluster instance. 
    * */ 
    protected MiniDFSCluster buildMiniDFSCluster() throws IOException { 
     return new MiniDFSCluster.Builder(new Configuration()) 
        .nameNodePort(DEFAULT_NAMENODE_PORT) 
        .numDataNodes(ONE_DATANODE) 
        .build(); 
    } 

    protected abstract Class<? extends Configured> getMRJobClass(); 
    protected abstract Class<? extends Mapper> getMapperClass(); 
    protected abstract Class<? extends Reducer> getReducerClass(); 
    protected abstract Class<? extends InputFormat> getInputFormat(); 
    protected abstract Class<? extends OutputFormat> getOutputFormat(); 
    protected abstract Class<?> getOutputKeyClass(); 
    protected abstract Class<?> getOutputValueClass(); 

} 

而具體的測試子類:

public class POIClusterMapreduceTest extends AbstractClusterMapReduceTest{ 

    private static final String INTEGRATION = "integration"; 


    @BeforeClass(groups = INTEGRATION) 
    public void setup() throws IOException { 
     super.setupEnv(); 
    } 

    //@Test(groups = INTEGRATION) 
    public void runJob() throws InterruptedException, IOException, ClassNotFoundException { 
     boolean result = createAndSubmitJob(); 
     MatcherAssert.assertThat(result, Matchers.is(true)); 

     String outputResultAsString = getFileSystem().open(createPath(getOutputPath())).readUTF(); 
     MatcherAssert.assertThat(outputResultAsString.length(), Matchers.greaterThan(0)); 


    } 

    @AfterClass(groups = INTEGRATION) 
    public void tearDown(){ 
     super.tearDown(); 
    } 

    @Override 
    protected Class<Main> getMRJobClass() { 
     return Main.class; 
    } 

    @Override 
    protected Class<POIMapper> getMapperClass() { 
     return POIMapper.class; 
    } 

    @Override 
    protected Class<Reducer> getReducerClass() { 
     return Reducer.class; 
    } 

    @Override 
    protected Class<TextInputFormat> getInputFormat() { 
     return TextInputFormat.class; 
    } 

    @Override 
    protected Class<TextOutputFormat> getOutputFormat() { 
     return TextOutputFormat.class; 
    } 

    @Override 
    protected Class<LongWritable> getOutputKeyClass() { 
     return LongWritable.class; 
    } 

    @Override 
    protected Class<XVLRDataWritable> getOutputValueClass() { 
     return XVLRDataWritable.class; 
    } 

    @Override 
    protected String getInputDatasetName() { 
     return "mr-input-data"; 
    } 
} 
+0

看起來你缺少類路徑中的'hadoop-yarn-common-2.0.0-cdh4.x.x.jar'。你如何運作你的工作? – maksimov

+0

請參閱更新的問題。 – Sergey

回答

0

的問題是可怕的maven衝突。下面是正確的Maven依賴:

<dependencies> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-core</artifactId> 
      <version>${hadoop.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-common</artifactId> 
      <version>${hadoop.common.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-hdfs</artifactId> 
      <version>${hadoop.common.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-hdfs</artifactId> 
      <classifier>tests</classifier> 
      <version>${hadoop.common.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-test</artifactId> 
      <version>${hadoop.version}</version> 
     </dependency> 
    </dependencies> 

和:

<properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <hadoop.version>2.0.0-mr1-cdh4.1.1</hadoop.version> 
    <hadoop.common.version>2.0.0-cdh4.1.1</hadoop.common.version> 
</properties> 

這裏是基類來進行測試:

public abstract class AbstractClusterMRTest { 
    private static final Log LOG = LogFactory.getLog(AbstractClusterMRTest.class); 

    public static final String DEFAULT_LOG_CATALOG = "local-mr-logs"; 
    public static final String SLASH = "/"; 
    public static final String MR_DATA_OUTPUT = "mr-data-output"; 
    public static final String DEFAULT_OUTPUT_FILE_NAME = "part-r-00000"; 

    private static final int DEFAULT_REDUCE_NUM_TASKS = 1; 


    private Configuration configuration; 
    private FileSystem localFileSystem; 
    private MiniMRCluster mrCluster; 
    private JobConf mrClusterConf; 


    /** 
    * Always provide path to log catalog. 
    * */ 
    protected String getPathToLogCatalog(){ 
     File logCatalog = new File(getPathToOutputDirectory()+ SLASH + DEFAULT_LOG_CATALOG); 
     if(!logCatalog.exists()){ 
      logCatalog.mkdir(); 
     } 
     LOG.info("Path to log catalog is: "+logCatalog.getAbsolutePath()); 
     return logCatalog.getAbsolutePath(); 
    } 

    private String getPathToOutputDirectory(){ 
     return System.getProperty("project.build.directory"); 
    } 

    public void setup() throws IOException{ 
     System.setProperty("hadoop.log.dir", getPathToLogCatalog()); 

     configuration = new Configuration(true); 
     localFileSystem = FileSystem.get(configuration); 
     mrCluster = new MiniMRCluster(1, localFileSystem.getUri().toString(), 1, null, null, new JobConf(configuration)); 
     mrClusterConf = mrCluster.createJobConf(); 
    } 

    public void tearDown() { 
     if (mrCluster != null) { 
      mrCluster.shutdown(); 
      mrCluster = null; 
     } 
    } 

    /** 
    * Use this method to get JobBuilder configured for testing purposes. 
    * @return JobBuilder instance ready for further configuration. 
    * */ 
    public JobBuilder createTestableJobInstance() throws IOException{ 
     return new JobBuilder(mrClusterConf, this.getClass().getSimpleName()+"-mrjob") 
       .withNumReduceTasks(DEFAULT_REDUCE_NUM_TASKS); 
    } 

    /** 
    * Pass configured JobBuilder and wait for completion 
    * @param jobBuilder is a JobBuilder ready to submit 
    * @return job completion result. 
    * */ 
    public boolean buildSubmitAndWaitForCompletion(JobBuilder jobBuilder) 
         throws InterruptedException, IOException, ClassNotFoundException { 
     String pathToInputFile = getPathToInputData(); 
     checkThatFileExists(pathToInputFile); 

     Job job = jobBuilder.build(); 
     FileInputFormat.setInputPaths(job, pathToInputFile); 
     FileOutputFormat.setOutputPath(job, createPath(getOutputPath())); 

     LOG.info("Submitting job..."); 
     job.submit(); 
     LOG.info("Job has been submitted."); 

     String trackingUrl = job.getTrackingURL(); 
     String jobId = job.getJobID().toString(); 
     LOG.info("trackingUrl:" +trackingUrl); 
     LOG.info("jobId:" +jobId); 

     return job.waitForCompletion(true); 
    } 

    /** 
    * By declaration input data should be stored in test/resources folder: 
    * ConcreteTestClassName/in/getInputDatasetName() 
    * Don't forget to override @link{AbstractClusterMRTest#getInputDatasetName()} 
    * @return path to input data 
    * */ 
    protected String getPathToInputData(){ 
     String pathFile = this.getClass().getSimpleName() + SLASH + "in" + SLASH+ getInputDatasetName(); 
     LOG.info("Path for getting URL to file:" + pathFile); 
     URL urlToFile = this.getClass().getClassLoader().getResource(pathFile); 

     File file = FileUtils.toFile(urlToFile); 
     return file.getAbsolutePath(); 
    } 

    /** 
    * Dataset should be placed in resources/ConcreteClusterMapReduceTest 
    * @return a name of a file from catalog. 
    * */ 
    protected abstract String getInputDatasetName(); 

    /** 
    * @return path reducer output 
    * default is: @{link AbstractClusterMapReduceTestOld#DEFAULT_OUTPUT_CATALOG} 
    * */ 
    protected String getOutputPath(){ 
     return getPathToOutputDirectory()+ SLASH + MR_DATA_OUTPUT; 
    } 

    /** 
    * @return text lines from reducer output file. 
    * */ 
    protected List<String> getLinesFromOutputFile() throws IOException{ 
     String pathToResult = getOutputPath()+SLASH+DEFAULT_OUTPUT_FILE_NAME; 
     File resultFile = new File(pathToResult); 
     return FileUtils.readLines(resultFile); 
    } 

    public abstract String getEtalonOutputFileName(); 

    protected List<String> getLinesFromEtalonOutputFile() throws IOException{ 
     String pathFile = this.getClass().getSimpleName() + SLASH + "out" + SLASH+ getEtalonOutputFileName(); 
     LOG.debug("path to etalon file: "+ pathFile); 
     URL urlToFile = this.getClass().getClassLoader().getResource(pathFile); 

     File file = FileUtils.toFile(urlToFile); 
     return FileUtils.readLines(file); 

    } 


    /** 
    * Creates @{link Path} using absolute path to some FS resource 
    * @return new Path instance. 
    * */ 
    protected Path createPath(String pathToFSResource){ 
     return new Path(pathToFSResource); 
    } 

    public void checkThatFileExists(String absolutePathToFile){ 
     if(! new File(absolutePathToFile).exists()){ 
      throw new Error("Path to input file is incorrect. Can't run MR job. Incorrect path is:"+absolutePathToFile); 
     } 
    } 

} 

只是延長,創建你自己的具體測試類。從createTestableJobInstance獲取作業(Job有一個簡單的builder-wrapper)。配置它並調用buildSubmitAndWaitForCompletion