0
我有一個簡單的MapReduce作業,它應該從文本文件中讀取字典,並且它們逐行處理另一個大文件並計算逆文檔矩陣。輸出應該是這樣的:MapReduce - reducer發出一行輸出
word-id1 docX:tfX docY:tfY
word-id2 docX:tfX docY:tfY etc...
但是,減速器的輸出僅在一個huuuge線發射。我不明白爲什麼它應該爲每個word-id
(這是reducer的關鍵)發出一個新行。
映射器會生成正確的輸出(對word-id
和doc-id:tf
的值在單獨的行上)。我測試了沒有減速器。減速器應該爲每個鍵在一行上追加對應於相同鍵的值。
請你看看我的代碼(特別是減速器和工作配置),並告訴我爲什麼減速器只發出一條巨大的線而不是與指定鍵對應的多條線?我花了好幾個小時來調試,無法繞過它。
public class Indexer extends Configured implements Tool {
/*
* Vocabulary: key = term, value = index
*/
private static Map<String, Integer> vocab = new HashMap<String, Integer>();
public static void main(String[] arguments) throws Exception {
System.exit(ToolRunner.run(new Indexer(), arguments));
}
public static class Comparator extends WritableComparator {
protected Comparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// Here we use exploit the implementation of compareTo(...) in
// Text.class.
return -a.compareTo(b);
}
}
public static class IndexerMapper extends
Mapper<Object, Text, IntWritable, Text> {
private Text result = new Text();
// load vocab from distributed cache
public void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
FileSystem fs = FileSystem.get(conf);
URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
Path getPath = new Path(cacheFiles[0].getPath());
BufferedReader bf = new BufferedReader(new InputStreamReader(
fs.open(getPath)));
String line = null;
while ((line = bf.readLine()) != null) {
StringTokenizer st = new StringTokenizer(line, " \t");
int index = Integer.parseInt(st.nextToken()); // first token is the line number - term id
String word = st.nextToken(); // second element is the term
// save vocab
vocab.put(word, index);
}
}
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// init TF map
Map<String, Integer> mapTF = new HashMap<String, Integer>();
// parse input string
StringTokenizer st = new StringTokenizer(value.toString(), " \t");
// first element is doc index
int index = Integer.parseInt(st.nextToken());
// count term frequencies
String word;
while (st.hasMoreTokens()) {
word = st.nextToken();
// check if word is in the vocabulary
if (vocab.containsKey(word)) {
if (mapTF.containsKey(word)) {
int count = mapTF.get(word);
mapTF.put(word, count + 1);
} else {
mapTF.put(word, 1);
}
}
}
// compute TF-IDF
int wordIndex;
for (String term : mapTF.keySet()) {
int tf = mapTF.get(term);
if (vocab.containsKey(term)) {
wordIndex = vocab.get(term);
context.write(new IntWritable(wordIndex), new Text(index + ":" + tf));
}
}
}
}
public static class IndexerReducer extends Reducer<IntWritable, Text, IntWritable, Text>
{
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
StringBuilder sb = new StringBuilder(16000);
for (Text value : values)
{
sb.append(value.toString() + " ");
}
context.write(key, new Text(sb.toString()));
}
}
/**
* This is where the MapReduce job is configured and being launched.
*/
@Override
public int run(String[] arguments) throws Exception {
ArgumentParser parser = new ArgumentParser("TextPreprocessor");
parser.addArgument("input", true, true, "specify input directory");
parser.addArgument("output", true, true, "specify output directory");
parser.parseAndCheck(arguments);
Path inputPath = new Path(parser.getString("input"));
Path outputDir = new Path(parser.getString("output"));
// Create configuration.
Configuration conf = getConf();
// add distributed file with vocabulary
DistributedCache
.addCacheFile(new URI("/user/myslima3/vocab.txt"), conf);
// Create job.
Job job = new Job(conf, "WordCount");
job.setJarByClass(IndexerMapper.class);
// Setup MapReduce.
job.setMapperClass(IndexerMapper.class);
//job.setCombinerClass(IndexerReducer.class);
job.setReducerClass(IndexerReducer.class);
// Sort the output words in reversed order.
job.setSortComparatorClass(Comparator.class);
job.setNumReduceTasks(1);
// Specify (key, value).
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// Input.
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output.
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
FileSystem hdfs = FileSystem.get(conf);
// Delete output directory (if exists).
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute the job.
return job.waitForCompletion(true) ? 0 : 1;
}
}
只是爲了確認,你在mapper輸出中得到了不同的鍵?你也可以更新示例輸出。也可以在wordpad中查看,如果你有分隔符,如果你的行數很大,你可能會忽略這些行。 –
是的,我得到了不同的映射器的鍵,這是確認...輸出格式映射器是關鍵[TAB]值 – Smajl
你是如何確認你的映射器輸出是正確的?你把減速器的數量設置爲0嗎?另外我認爲你需要在比較器中投射物體?只是嘗試刪除自定義比較器,看看它是否做出任何改變? –