目的:Hadoop的,成功的Map Reduce工作,但沒有輸出
我想使用的Map Reduce合併數據。我在同一個文件夾中有多組數據。
方法:
所以我跑的Map Reduce順序內的程序/流量多次合併工作。
問題:
,我面對的不是失敗的任務,但成功的工作,沒有輸出的問題。第一個(有時是兩個)迭代總是有輸出(part-r-00000),但不包括以下內容。我使用的樣本數據集,這是非常小的尺寸和體積(1〜2千,約5個文件)
我的嘗試:
使得線程每次運行後,但到5秒入睡徒勞無功。我試着在使用webhdfs之後檢查了很長一段時間,仍然沒有這樣的文件。
請問您能否給我啓發一下?提前致謝。
圖片:
代碼:
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package mergedata;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @author abcdefg
*/
public class MergeData extends Configured implements Tool{
/**
* @param args the command line arguments
*/
public static class ReadMapper
extends Mapper<Object, Text, Text, IntWritable>{
@Override
public void map(Object key, Text value, Mapper.Context context
) throws IOException, InterruptedException {
context.write(new Text(value.toString()), new IntWritable(1));
}
}
public static class MergeReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Reducer.Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
FileSystem hdfs = FileSystem.get(conf);
args = new GenericOptionsParser(conf, args).getRemainingArgs();
if (args.length != 3) {
System.err.println(args.length);
System.err.println("Usage: mergedata <input folder> <temporary folder> <output folder>");
System.exit(1);
}
// FileSystem fs = FileSystem.get(conf);
// ContentSummary cs = fs.getContentSummary(new Path(args[0]));
// long fileCount = cs.getFileCount();
Job job = Job.getInstance(conf);
job.setJarByClass(MergeData.class);
job.setMapperClass(ReadMapper.class);
job.setReducerClass(MergeReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// String files =().replaceAll(",", "," + args[0] + "/");
// FileInputFormat.addInputPaths(job, files);
int jobComplete = 1;
FileStatus[] fileStatus = hdfs.listStatus(new Path(args[0]));
HashMap<String,Pair<String,Long>> map = new HashMap<String,Pair<String,Long>>();
String tempName;
String tempKey;
Path tempPath;
for (FileStatus fileStatu : fileStatus) {
tempPath = fileStatu.getPath();
tempName = tempPath.getName();
tempKey = tempName.substring(0,12);
if (map.containsKey(tempKey)) {
map.put(tempKey,new Pair(map.get(tempKey).getLeft() + "," +
tempPath.toString(),
map.get(tempKey).getRight() + fileStatu.getLen()));
} else {
map.put(tempKey, new Pair(tempPath.toString(),fileStatu.getLen()));
}
}
String[] files = map.keySet().toArray(new String[map.keySet().size()]);
String[] inputFiles;
// String[] files = args[1].split(",");
for (String file : files)
{
System.out.println("file = " + file);
// FileInputFormat.addInputPath(job, new Path(args[0] + "/" + file + "*"));
System.out.println(args[2] + "/" + file);
if (hdfs.exists(new Path(args[2] + "/" + file))) {
System.out.println(file + " exists in " + args[2]);
map.put(file,new Pair(
map.get(file).getLeft() + "," + args[2] + "/" + file,
map.get(file).getRight() + hdfs.getFileStatus(new Path(args[2] + "/" + file)).getLen()
));
}
System.out.println("MR job input files : " + map.get(file).getLeft());
FileInputFormat.setInputPaths(job, map.get(file).getLeft());
System.out.println("MR job output dir : " + args[1] + "/" + file);
FileOutputFormat.setOutputPath(job ,new Path(args[1] + "/" + file));
if (hdfs.exists(new Path(args[1] + "/" + file))) {
hdfs.delete(new Path(args[1] + "/" + file), true); // Shouldn't occur
}
jobComplete = Math.max(jobComplete, (job.waitForCompletion(true))? 0 : 1);
// hdfs.getFileStatus(tempFile)
if (job.isSuccessful()) {
// Following sequence includes size check before deleting files
FileStatus[] filesStatuz = hdfs.listStatus(new Path(args[1] + "/" + file + "/part-r-00000"));
System.out.println("filesStatuz[0].getLen() = " + filesStatuz[0].getLen());
System.out.println("totalLen = " + map.get(file).getRight());
if (filesStatuz[0].getLen() >= map.get(file).getRight()) {
if (hdfs.exists(new Path(args[2] + "/" + file))) {
System.out.println("Found the main file of " + file);
hdfs.rename(new Path(args[2] + "/" + file), new Path(args[2] + "/" + file + "_tmp"));
}
hdfs.rename(new Path(args[1] + "/" + file + "/part-r-00000"), new Path(args[2] + "/" + file));
hdfs.delete(new Path(args[1] + "/" + file), true);
System.out.println("Done safe replacement");
// hdfs.delete(new Path(args[0] + "/" + file + "*"), false);
inputFiles = map.get(file).getLeft().split(",");
for (String inputFile : inputFiles) {
if (!inputFile.equals(args[2] + "/" + file)) {
hdfs.delete(new Path(inputFile), false);
System.out.println(inputFile + " has been deleted");
}
}
if (hdfs.exists(new Path(args[2] + "/" + file + "_tmp"))) {
hdfs.delete(new Path(args[2] + "/" + file + "_tmp"), false);
System.out.println("Deleted previous main file of " + file);
}
}
else {
System.out.println("Merging of " + file +"might have failed. Input and output size doesn't tally");
}
}
}
return(jobComplete);
}
public static void main(String[] args) throws Exception {
// TODO code application logic here
int exitCode = ToolRunner.run(new MergeData(), args);
System.exit(exitCode);
}
public class Pair<L,R> {
private final L left;
private final R right;
public Pair(L left, R right) {
this.left = left;
this.right = right;
}
public L getLeft() { return left; }
public R getRight() { return right; }
@Override
public int hashCode() { return left.hashCode()^right.hashCode(); }
@Override
public boolean equals(Object o) {
if (!(o instanceof Pair)) return false;
Pair pairo = (Pair) o;
return this.left.equals(pairo.getLeft()) &&
this.right.equals(pairo.getRight());
}
}
}
流量:
的本質是,它會結合類似日期例如文件:cdr_201輸入文件夾(args [0])中的50701_0,cdr_20150701_1添加到主文件(例如cdr_20150701)中並放入合併文件夾(args [3])。但是如果在合併之前存在這樣的主文件,則所有文件例如:cdr_20150701_0,cdr_20150701_1和cdr_20150701將被合併爲新的cdr_20150701。 part-0-00000將存儲在一個臨時文件夾(args [1])中。成功傳輸後,臨時文件夾和部件將被刪除。
似乎輸入目錄/文件不存在... – vefthym
你可以發佈你想運行作業的代碼?這不是一個成功的工作,它有一個FileNotFoundException,這顯然是問題。 – fd8s0
只是Driver類應該足夠啓動...我想這只是沒有設置第二個工作的輸入路徑作爲第一個輸出路徑的問題... – vefthym