2015-05-31 76 views
0

下面這段代碼來自亞歷克斯·福爾摩斯的Hadoop在實踐版本 - 2: 鏈接:https://github.com/alexholmes/hiped2/tree/master/src/main/java/hip/ch5/http如何將這個舊的API MapReduce工作代碼轉換爲新的MapReduce

這MapReduce的代碼,這映射器讀取URL的從列表文本文件,發送HTTP請求並將正文內容存儲到文本文件。

然而,這段代碼是基於舊的mapreduce api編寫的,我想轉換爲新版本的mapreduce api。將JobConf更改爲Job + Configuration並擴展新的Mapper可能很簡單,但由於某些原因,我無法使其與我的代碼一起工作。

我寧願等待後我修改後的代碼,以避免混亂,但原來的代碼中提到以下:

映射代碼:

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IOUtils; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.Mapper; 
import org.apache.hadoop.mapred.OutputCollector; 
import org.apache.hadoop.mapred.Reporter; 

import java.io.IOException; 
import java.io.InputStream; 
import java.io.OutputStream; 
import java.net.URL; 
import java.net.URLConnection; 

public final class HttpDownloadMap 
    implements Mapper<LongWritable, Text, Text, Text> { 
    private int file = 0; 
    private Configuration conf; 
    private String jobOutputDir; 
    private String taskId; 
    private int connTimeoutMillis = 
     DEFAULT_CONNECTION_TIMEOUT_MILLIS; 
    private int readTimeoutMillis = DEFAULT_READ_TIMEOUT_MILLIS; 
    private final static int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000; 
    private final static int DEFAULT_READ_TIMEOUT_MILLIS = 5000; 

    public static final String CONN_TIMEOUT = 
     "httpdownload.connect.timeout.millis"; 

    public static final String READ_TIMEOUT = 
     "httpdownload.read.timeout.millis"; 

    @Override 
    public void configure(JobConf job) { 
    conf = job; 
    jobOutputDir = job.get("mapred.output.dir"); 
    taskId = conf.get("mapred.task.id"); 

    if (conf.get(CONN_TIMEOUT) != null) { 
     connTimeoutMillis = Integer.valueOf(conf.get(CONN_TIMEOUT)); 
    } 
    if (conf.get(READ_TIMEOUT) != null) { 
     readTimeoutMillis = Integer.valueOf(conf.get(READ_TIMEOUT)); 
    } 
    } 

    @Override 
    public void map(LongWritable key, Text value, 
        OutputCollector<Text, Text> output, 
        Reporter reporter) throws IOException { 
    Path httpDest = 
     new Path(jobOutputDir, taskId + "_http_" + (file++)); 

    InputStream is = null; 
    OutputStream os = null; 
    try { 
     URLConnection connection = 
      new URL(value.toString()).openConnection(); 
     connection.setConnectTimeout(connTimeoutMillis); 
     connection.setReadTimeout(readTimeoutMillis); 
     is = connection.getInputStream(); 

     os = FileSystem.get(conf).create(httpDest); 

     IOUtils.copyBytes(is, os, conf, true); 
    } finally { 
     IOUtils.closeStream(is); 
     IOUtils.closeStream(os); 
    } 

    output.collect(new Text(httpDest.toString()), value); 
    } 

    @Override 
    public void close() throws IOException { 
    } 
} 

招聘亞軍/驅動程序代碼:

import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileInputFormat; 
import org.apache.hadoop.mapred.FileOutputFormat; 
import org.apache.hadoop.mapred.JobClient; 
import org.apache.hadoop.mapred.JobConf; 

public final class HttpDownloadMapReduce { 

    public static void main(String... args) throws Exception { 
    runJob(args[0], args[1]); 
    } 

    public static void runJob(String src, String dest) 
     throws Exception { 
    JobConf job = new JobConf(); 
    job.setJarByClass(HttpDownloadMap.class); 

    FileSystem fs = FileSystem.get(job); 
    Path destination = new Path(dest); 

    fs.delete(destination, true); 

    job.setMapperClass(HttpDownloadMap.class); 

    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 

    FileInputFormat.setInputPaths(job, src); 
    FileOutputFormat.setOutputPath(job, destination); 

    JobClient.runJob(job); 
    } 
} 

運行配置:

args[0] = "testData/input/urls.txt" 
args[1] = "testData/output" 

urls.txt包含:

http://www.google.com 
http://www.yahoo.com 
+0

映射器類:http://textuploader.com/76sx – gkc123

+0

作業執行器類:http://textuploader.com/76si - 代碼鏈接將在1年內過期。 – gkc123

回答

0

嘗試這些變化:

  1. 導入,而不是那些mapred的org.apache.hadoop.mapreduce包。

  2. 改變舊OutputCollectorReporterContext爲新的API使用Context對象進行寫操作。

  3. JobClient更改爲JobJobConfConfiguration

+0

嗨,Nilay,我根據你的建議修改了代碼,但我無法按照它的方式工作。你能幫我調試一下,代碼上傳到這些外部鏈接。 Mapper類:http://www.textuploader.com/76sx&Job Runner類:http://www.textuploader.com/76si – gkc123

相關問題