到文件的值我有這樣, 一個記錄的輸入| 1 | Y, C | 0 | N, Ç| 1 | N, ð| 2 | Y, e | 1 | Y寫而無需移動到減速器
現在,在映射器中,我必須檢查第三列的值。如果是'Y',則該記錄必須直接寫入輸出文件,而不將該記錄移動到減速器或否則即'N'值記錄必須移動到減速器以供進一步處理。
因此, a | 1 | Y, ð| 2 | Y, E | 1 | Y 不應該去減速器但 C | 0 | N, ç| 1 | N 應到減速器,然後到輸出文件。
我該怎麼做?
到文件的值我有這樣, 一個記錄的輸入| 1 | Y, C | 0 | N, Ç| 1 | N, ð| 2 | Y, e | 1 | Y寫而無需移動到減速器
現在,在映射器中,我必須檢查第三列的值。如果是'Y',則該記錄必須直接寫入輸出文件,而不將該記錄移動到減速器或否則即'N'值記錄必須移動到減速器以供進一步處理。
因此, a | 1 | Y, ð| 2 | Y, E | 1 | Y 不應該去減速器但 C | 0 | N, ç| 1 | N 應到減速器,然後到輸出文件。
我該怎麼做?
你可能會做的是用MultipleOutputs - click here將'Y'和'N'類型的記錄分隔成兩個不同的映射器文件。
接下來,您爲兩個新生成的'Y'和'N'類型數據集運行分離作業。 對於'Y'類型,將減速器的數量設置爲0,以便減速器不被使用。而且,對於'N'類型,按照您想要使用減速器的方式進行。
希望這會有所幫助。
在您的地圖功能中,您將逐行獲取輸入。按照|使用|分割作爲分隔符。 (通過使用String.split()
方法是精確的) 它看起來像這樣
String[] line = value.toString().split('|');
訪問該陣列的第三個元素由line[2]
然後,使用一個簡單的if else
陳述,發射具有N值的輸出進一步處理。
if-else如何提供幫助?如果執行context.write()並且在工作中有一個reducer,它將從if或else中調用。我很抱歉,我不太明白。 – Tariq
看看這個工程,
public class Xxxx {
public static class MyMapper extends
Mapper<LongWritable, Text, LongWritable, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
Random r = new Random();
FileSplit split = (FileSplit)context.getInputSplit();
String fileName = split.getPath().getName();
FSDataOutputStream out = fs.create(new Path(fileName + "-m-" + r.nextInt()));
String parts[];
String line = value.toString();
String[] splits = line.split(",");
for(String s : splits) {
parts = s.split("\\|");
if(parts[2].equals("Y")) {
out.writeBytes(line);
}else {
context.write(key, value);
}
}
out.close();
fs.close();
}
}
public static class MyReducer extends
Reducer<LongWritable, Text, LongWritable, Text> {
public void reduce(LongWritable key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
for(Text t : values) {
context.write(key, t);
}
}
}
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:9000");
conf.set("mapred.job.tracker", "localhost:9001");
Job job = new Job(conf, "Xxxx");
job.setJarByClass(Xxxx.class);
Path outPath = new Path("/output_path");
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
FileInputFormat.addInputPath(job, new Path("/input.txt"));
FileOutputFormat.setOutputPath(job, outPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
這不是那麼清楚。你只是不想運行減速機? –
我不想將第三列值的記錄移動爲'Y',剩餘部分應該移至減速器。 –
因此,您希望映射器將第3列'Y'的記錄直接寫入HDFS,並且第3列'N'的記錄只應寫入減速器? –