2015-10-13 33 views
1

我想運行兩個映射器,這些映射器在不同的目錄中產生兩個不同的輸出。第一個映射器(作爲參數發送)的輸出應發送到第二個映射器的輸入。我在驅動程序類在Hadoop中對多個輸出目錄使用多個映射器MapReduce

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 


public class Export_Column_Mapping 
{ 
    private static String[] Detail_output_column_array = new String[27]; 
    private static String[] Shop_output_column_array = new String[8]; 
    private static String details_output = null ; 
    private static String Shop_output = null; 

    public static void main(String[] args) throws Exception 
    { 

     String Output_filetype = args[3]; 
     String Input_column_number = args[4]; 
     String Output_column_number = args[5]; 

     Configuration Detailsconf = new Configuration(false); 

     Detailsconf.setStrings("output_filetype",Output_filetype); 
     Detailsconf.setStrings("Input_column_number",Input_column_number); 
     Detailsconf.setStrings("Output_column_number",Output_column_number); 

     Job Details = new Job(Detailsconf," Export_Column_Mapping"); 

     Details.setJarByClass(Export_Column_Mapping.class); 
     Details.setJobName("DetailsFile_Job"); 

     Details.setMapperClass(DetailFile_Mapper.class); 
     Details.setNumReduceTasks(0); 

     Details.setInputFormatClass(TextInputFormat.class); 
     Details.setOutputFormatClass(TextOutputFormat.class); 

     FileInputFormat.setInputPaths(Details, new Path(args[0])); 
     FileOutputFormat.setOutputPath(Details, new Path(args[1])); 

     if(Details.waitForCompletion(true)) 
     { 

     Configuration Shopconf = new Configuration(); 

     Job Shop = new Job(Shopconf,"Export_Column_Mapping"); 
     Shop.setJarByClass(Export_Column_Mapping.class); 
     Shop.setJobName("ShopFile_Job"); 

     Shop.setMapperClass(ShopFile_Mapper.class); 
     Shop.setNumReduceTasks(0); 

     Shop.setInputFormatClass(TextInputFormat.class); 
     Shop.setOutputFormatClass(TextOutputFormat.class); 

     FileInputFormat.setInputPaths(Shop, new Path(args[1])); 
     FileOutputFormat.setOutputPath(Shop, new Path(args[2])); 

     MultipleOutputs.addNamedOutput(Shop, "text", TextOutputFormat.class,LongWritable.class, Text.class); 
     System.exit(Shop.waitForCompletion(true) ? 0 : 1); 
     } 
    } 

    public static class DetailFile_Mapper extends Mapper<LongWritable,Text,Text,Text> 
    { 
     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
     { 
      String str_Output_filetype = context.getConfiguration().get("output_filetype"); 

      String str_Input_column_number = context.getConfiguration().get("Input_column_number"); 
      String[] input_columns_number = str_Input_column_number.split(","); 

      String str_Output_column_number= context.getConfiguration().get("Output_column_number");  
      String[] output_columns_number = str_Output_column_number.split(","); 

      String str_line = value.toString(); 
      String[] input_column_array = str_line.split(","); 

      try 
      { 

       for(int i = 0;i<=input_column_array.length+1; i++) 
       { 
        int int_outputcolumn = Integer.parseInt(output_columns_number[i]); 
        int int_inputcolumn = Integer.parseInt(input_columns_number[i]); 

        if((int_inputcolumn != 0) && (int_outputcolumn != 0) && output_columns_number.length == input_columns_number.length) 
        { 

         Detail_output_column_array[int_outputcolumn-1] = input_column_array[int_inputcolumn-1]; 


         if(details_output != null) 
         { 
          details_output = details_output+"  "+ Detail_output_column_array[int_outputcolumn-1]; 
          Shop_output = Shop_output+"  "+ Shop_output_column_array[int_outputcolumn-1]; 

         }else 
         { 
          details_output = Detail_output_column_array[int_outputcolumn-1]; 
          Shop_output = Shop_output_column_array[int_outputcolumn-1]; 

         } 
        } 
       } 

      }catch (Exception e) 
      { 

      } 
      context.write(null,new Text(details_output)); 
     } 
    } 
    public static class ShopFile_Mapper extends Mapper<LongWritable,Text,Text,Text> 
    { 
     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
     { 
      try 
      { 

       for(int i = 0;i<=Shop_output_column_array.length; i++) 
       { 
        Shop_output_column_array[0] = Detail_output_column_array[0]; 
        Shop_output_column_array[1] = Detail_output_column_array[1]; 
        Shop_output_column_array[2] = Detail_output_column_array[2]; 
        Shop_output_column_array[3] = Detail_output_column_array[3]; 
        Shop_output_column_array[4] = Detail_output_column_array[14]; 

        if(details_output != null) 
        { 
         Shop_output = Shop_output+"  "+ Shop_output_column_array[i]; 

        }else 
        { 
         Shop_output = Shop_output_column_array[i-1]; 

        } 
       } 
      }catch (Exception e){ 

      } 
      context.write(null,new Text(Shop_output)); 
     } 
    } 

} 

我的錯誤此代碼..

Error:org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: file:/home/Barath.B.Natarajan.ap/rules/text.txt

我想運行的作業一一任何一個可以幫助我在這?...

回答

1

有一些東西叫做jobcontrol,你將能夠實現它。

假設有兩個職位A和B

ControlledJob A= new ControlledJob(JobConf for A); 
ControlledJob B= new ControlledJob(JobConf for B); 
B.addDependingJob(A); 

JobControl jControl = newJobControl("Name"); 
jControl.addJob(A); 
jControl.addJob(B); 
Thread runJControl = new Thread(jControl); 
runJControl.start(); 
while (!jControl.allFinished()) { 
code = jControl.getFailedJobList().size() == 0 ? 0 : 1; 
Thread.sleep(1000); 
} 
System.exit(1); 
在這樣開始

初始化代碼:

int code =1; 

讓你的情況的第一份工作是第一個映射器零減速和第二作爲第二個帶零減速器的映射器。配置應該使得B的輸入路徑和A的輸出路徑應該相同。

+0

** code = jControl.getFailedJobList()。size()== 0? 0:1; **你能解釋一下什麼代碼? – Barath

+0

我得到這個錯誤,而運行這個,但我已經設置了所有必需的參數**錯誤:org.apache.hadoop.mapred.InvalidJobConfException:輸出目錄沒有設置** – Barath

+0

只是在初始化int code = 1 .. – madhu

相關問題