2013-04-16 79 views
1

我想通過一些wikipedia轉儲(使用壓縮bz2格式)在Java Mapper/Reducer上運行hadoop流式作業。我試圖使用WikiHadoop,這是維基媒體最近發佈的一個界面。使用Java Mapper/Reducer進行Hadoop流式處理

WikiReader_Mapper.java

package courseproj.example; 

// Mapper: emits (token, 1) for every article occurrence. 
public class WikiReader_Mapper extends MapReduceBase implements Mapper<Text, Text, Text, IntWritable> { 

    // Reuse objects to save overhead of object creation. 
    private final static Text KEY = new Text(); 
    private final static IntWritable VALUE = new IntWritable(1); 

    @Override 
    public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter) 
      throws IOException { 
     KEY.set("article count"); 
     collector.collect(KEY, VALUE); 
    } 
} 

WikiReader_Reducer.java

package courseproj.example; 

//Reducer: sums up all the counts. 
public class WikiReader_Reducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { 

    private final static IntWritable SUM = new IntWritable(); 

    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector, 
      Reporter reporter) throws IOException { 
     int sum = 0; 
     while (values.hasNext()) { 
      sum += values.next().get(); 
     } 
     SUM.set(sum); 
     collector.collect(key, SUM); 
    } 
} 

我跑的命令是

hadoop jar lib/hadoop-streaming-2.0.0-cdh4.2.0.jar \ 
     -libjars lib2/wikihadoop-0.2.jar \ 
     -D mapreduce.input.fileinputformat.split.minsize=300000000 \ 
     -D mapreduce.task.timeout=6000000 \ 
     -D org.wikimedia.wikihadoop.previousRevision=false \ 
     -input enwiki-latest-pages-articles10.xml-p000925001p001325000.bz2 \ 
     -output out \ 
     -inputformat org.wikimedia.wikihadoop.StreamWikiDumpInputFormat \ 
     -mapper WikiReader_Mapper \ 
     -reducer WikiReader_Reducer 

,我收到錯誤消息

Error: java.lang.RuntimeException: Error in configuring object 
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) 
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:72) 
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:130) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:424) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340) 
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:157) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408) 
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:152) 

Caused by: java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:103) 

Caused by: java.io.IOException: Cannot run program "WikiReader_Mapper": java.io.IOException: error=2, No such file or directory 
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:460) 
    at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:209) 

我更熟悉新的Hadoop API主場迎戰老。由於我的映射器和Reducer代碼位於兩個不同的文件中,因此,在同時按照h​​adoop流的命令結構(明確設置映射器和reducer類)的同時,我可以在哪裏定義作業的JobConf配置參數。有沒有一種方法可以將mapper和reducer代碼全部封裝到一個類中(擴展了Configure和Implement Tool,這是新API中完成的工作),並將類名傳遞給hadoop流命令行與設置分別映射和減少類別?

回答

0

流式傳輸使用舊的API(org.apache.hadoop.mapred) - 但您的映射器和reducer類擴展了新的API類(org.apache.hadoop.mapreduce)。

試着改變你的映射器實現org.apache.hadoop.mapred.Mapper,並減速落實org.apache.hadoop.mapred.Reducer,例如:

package courseproj.example; 

// Mapper: emits ("article", 1) for every article occurrence. 
public class WikiReader_Mapper implements Mapper<Text, Text, Text, IntWritable> { 

    // Reuse objects to save overhead of object creation. 
    private final static Text KEY = new Text(); 
    private final static IntWritable VALUE = new IntWritable(1); 

    @Override 
    public void map(Text key, Text value, OutputCollector<Text, IntWritable> collector, Reporter reporter) 
     throws IOException, InterruptedException { 
    KEY.set("article count"); 
    collector.collect(KEY, VALUE); 
    } 
} 
+0

我給你建議的修改。謝謝,我不知道hadoop流只在舊的API中。雖然我仍然收到與JobConf配置相關的錯誤消息(已更新新的錯誤消息)。 –

+0

hadoop流假定映射器和reducer參數是可執行文件 - 您已經通過了java類。爲什麼你使用hadoop流如果你的地圖和減少實現是用Java編寫的? –

+0

關於使用Java類作爲映射器/縮減器實現的信息,請參見本頁:http://hadoop.apache.org/docs/r1.1.2/streaming.html#Specifying+a+Java+Class+as+the+Mapper%2FReducer –

相關問題