2015-06-19 41 views
0

如何將ArrayList作爲值從映射器傳遞到reducer。在MapReduce中,如何將ArrayList作爲值從映射器發送到Reducer

我的代碼基本上有一定的規則來處理,並會根據規則創建新的值(字符串)。我維護列表中的所有輸出(在規則執行後生成),現在需要發送此輸出( Mapper值)傳遞給Reducer,並沒有辦法做到這一點。

可有人請點我的方向

添加代碼

package develop; 

import java.io.BufferedReader; 
import java.io.FileReader; 
import java.io.IOException; 
import java.net.URI; 
import java.net.URISyntaxException; 
import java.util.ArrayList; 
import java.util.LinkedHashMap; 
import java.util.List; 
import java.util.Map; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 

import utility.RulesExtractionUtility; 

public class CustomMap{ 


    public static class CustomerMapper extends Mapper<Object, Text, Text, Text> { 
     private Map<String, String> rules; 
     @Override 
     public void setup(Context context) 
     { 

      try 
      { 
       URI[] cacheFiles = context.getCacheFiles(); 
       setupRulesMap(cacheFiles[0].toString()); 
      } 
      catch (IOException ioe) 
      { 
       System.err.println("Error reading state file."); 
       System.exit(1); 
      } 

     } 

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 

//   Map<String, String> rules = new LinkedHashMap<String, String>(); 
//   rules.put("targetcolumn[1]", "ASSIGN(source[0])"); 
//   rules.put("targetcolumn[2]", "INCOME(source[2]+source[3])"); 
//   rules.put("targetcolumn[3]", "ASSIGN(source[1]"); 

//   Above is the "rules", which would basically create some list values from source file 

      String [] splitSource = value.toString().split(" "); 

      List<String>lists=RulesExtractionUtility.rulesEngineExecutor(splitSource,rules); 

//   lists would have values like (name, age) for each line from a huge text file, which is what i want to write in context and pass it to the reducer. 
//   As of now i havent implemented the reducer code, as m stuck with passing the value from mapper. 

//   context.write(new Text(), lists);---- I do not have a way of doing this 


     } 




     private void setupRulesMap(String filename) throws IOException 
     { 
      Map<String, String> rule = new LinkedHashMap<String, String>(); 
      BufferedReader reader = new BufferedReader(new FileReader(filename)); 
      String line = reader.readLine(); 
      while (line != null) 
      { 
       String[] split = line.split("="); 
       rule.put(split[0], split[1]); 
       line = reader.readLine(); 

       // rules logic 
      } 
      rules = rule; 
     } 
    } 
    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException, URISyntaxException { 


    Configuration conf = new Configuration(); 
    if (args.length != 2) { 
     System.err.println("Usage: customerMapper <in> <out>"); 
     System.exit(2); 
    } 
    Job job = Job.getInstance(conf); 
    job.setJarByClass(CustomMap.class); 
    job.setMapperClass(CustomerMapper.class); 
    job.addCacheFile(new URI("Some HDFS location")); 


    URI[] cacheFiles= job.getCacheFiles(); 
    if(cacheFiles != null) { 
     for (URI cacheFile : cacheFiles) { 
      System.out.println("Cache file ->" + cacheFile); 
     } 
    } 
    // job.setReducerClass(Reducer.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 

回答

0

要將一個ArrayList從mapper傳遞給reducer,很明顯對象必須實現Writable接口。你爲什麼不試試這個庫?

<dependency> 
    <groupId>org.apache.giraph</groupId> 
    <artifactId>giraph-core</artifactId> 
    <version>1.1.0-hadoop2</version> 
</dependency> 

它有一個抽象類:

public abstract class ArrayListWritable<M extends org.apache.hadoop.io.Writable> 
extends ArrayList<M> 
implements org.apache.hadoop.io.Writable, org.apache.hadoop.conf.Configurable 

你可以創建自己的類和源代碼填充抽象方法,並實現與您的代碼的接口方法。例如:

public class MyListWritable extends ArrayListWritable<Text>{ 
    ... 
} 
0

一個辦法做到這一點(可能不是唯一的,也不是最好的一個),將

  1. 將您的列表序列化爲字符串以將其傳遞到映射器中的輸出值

  2. 反序列化,當你在減速

如果這樣做,那麼你也應該擺脫字符串中包含序列列表中的所有特殊符號的讀取輸入值從字符串重建列表(如符號例如\n\t)。一個簡單的方法是使用base64編碼的字符串。

+1

謝謝你的建議,難道不可以去定製寫入.. – user72

+0

是的,很可能。但我相信自定義變量將按照我在答案中所述的方式實現。所以,如果你只是尋找解決方案來解決你的問題,我的工作肯定。 – Yann

+0

您是否在嘗試實施您的建議時可以參考一些示例/鏈接。謝謝 – user72

-2

您應該發送Text對象而不是String對象。然後,您可以在Reducer中使用object.toString()。請務必正確配置您的驅動程序。

如果您發佈您的代碼,我們會進一步幫助您。

+1

我已經添加了代碼以及一些評論,以使目標清晰一些。 – user72

相關問題