下面這段代碼來自亞歷克斯·福爾摩斯的Hadoop在實踐版本 - 2: 鏈接:https://github.com/alexholmes/hiped2/tree/master/src/main/java/hip/ch5/http如何將這個舊的API MapReduce工作代碼轉換爲新的MapReduce
這MapReduce的代碼,這映射器讀取URL的從列表文本文件,發送HTTP請求並將正文內容存儲到文本文件。
然而,這段代碼是基於舊的mapreduce api編寫的,我想轉換爲新版本的mapreduce api。將JobConf更改爲Job + Configuration並擴展新的Mapper可能很簡單,但由於某些原因,我無法使其與我的代碼一起工作。
我寧願等待後我修改後的代碼,以避免混亂,但原來的代碼中提到以下:
映射代碼:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;
public final class HttpDownloadMap
implements Mapper<LongWritable, Text, Text, Text> {
private int file = 0;
private Configuration conf;
private String jobOutputDir;
private String taskId;
private int connTimeoutMillis =
DEFAULT_CONNECTION_TIMEOUT_MILLIS;
private int readTimeoutMillis = DEFAULT_READ_TIMEOUT_MILLIS;
private final static int DEFAULT_CONNECTION_TIMEOUT_MILLIS = 5000;
private final static int DEFAULT_READ_TIMEOUT_MILLIS = 5000;
public static final String CONN_TIMEOUT =
"httpdownload.connect.timeout.millis";
public static final String READ_TIMEOUT =
"httpdownload.read.timeout.millis";
@Override
public void configure(JobConf job) {
conf = job;
jobOutputDir = job.get("mapred.output.dir");
taskId = conf.get("mapred.task.id");
if (conf.get(CONN_TIMEOUT) != null) {
connTimeoutMillis = Integer.valueOf(conf.get(CONN_TIMEOUT));
}
if (conf.get(READ_TIMEOUT) != null) {
readTimeoutMillis = Integer.valueOf(conf.get(READ_TIMEOUT));
}
}
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
Path httpDest =
new Path(jobOutputDir, taskId + "_http_" + (file++));
InputStream is = null;
OutputStream os = null;
try {
URLConnection connection =
new URL(value.toString()).openConnection();
connection.setConnectTimeout(connTimeoutMillis);
connection.setReadTimeout(readTimeoutMillis);
is = connection.getInputStream();
os = FileSystem.get(conf).create(httpDest);
IOUtils.copyBytes(is, os, conf, true);
} finally {
IOUtils.closeStream(is);
IOUtils.closeStream(os);
}
output.collect(new Text(httpDest.toString()), value);
}
@Override
public void close() throws IOException {
}
}
招聘亞軍/驅動程序代碼:
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public final class HttpDownloadMapReduce {
public static void main(String... args) throws Exception {
runJob(args[0], args[1]);
}
public static void runJob(String src, String dest)
throws Exception {
JobConf job = new JobConf();
job.setJarByClass(HttpDownloadMap.class);
FileSystem fs = FileSystem.get(job);
Path destination = new Path(dest);
fs.delete(destination, true);
job.setMapperClass(HttpDownloadMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, src);
FileOutputFormat.setOutputPath(job, destination);
JobClient.runJob(job);
}
}
運行配置:
args[0] = "testData/input/urls.txt"
args[1] = "testData/output"
urls.txt包含:
http://www.google.com
http://www.yahoo.com
映射器類:http://textuploader.com/76sx – gkc123
作業執行器類:http://textuploader.com/76si - 代碼鏈接將在1年內過期。 – gkc123