2015-09-27 37 views
1

我在我的Reducer中使用MultipleOutputs,因爲我希望爲每個鍵都有單獨的結果文件,但是,每個結果文件都是空的,但默認結果文件part-r-xxxx已創建且包含正確的值。使用MapReduce MultipleOutputs的空輸出文件

這是我JobDriver和減速

主要類

public static void main(String[] args) throws Exception { 
    int currentIteration = 0; 
    int reducerCount, roundCount; 

    Configuration conf = createConfiguration(currentIteration); 
    cleanEnvironment(conf); 
    Job job = new Job(conf, "cfim"); 

    //Input and output format configuration 
    job.setMapperClass(TransactionsMapper.class); 
    job.setReducerClass(PatriciaReducer.class); 

    job.setInputFormatClass(TransactionInputFormat.class); 
    job.setMapOutputKeyClass(LongWritable.class); 
    job.setMapOutputValueClass(Text.class); 

    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 

    reducerCount = roundCount = Math.floorDiv(getRoundCount(conf), Integer.parseInt(conf.get(MRConstants.mergeFactorSpecifier))); 

    FileInputFormat.addInputPath(job, new Path("/home/cloudera/datasets/input")); 
    Path outputPath = new Path(String.format(MRConstants.outputPathFormat, outputDir, currentIteration)); 
    FileOutputFormat.setOutputPath(job, outputPath); 
    MultipleOutputs.addNamedOutput(job, "key", TextOutputFormat.class, LongWritable.class, Text.class); 

    job.waitForCompletion(true); 

減速類

public class PatriciaReducer extends Reducer<LongWritable, Text, LongWritable, Text> { 

private ITreeManager treeManager; 
private SerializationManager serializationManager; 
private MultipleOutputs<LongWritable, Text> mos; 

@Override 
protected void setup(Context context) throws IOException ,InterruptedException { 
    treeManager = new PatriciaTreeManager(); 
    serializationManager = new SerializationManager(); 
    mos = new MultipleOutputs<LongWritable, Text>(context); 
} 

@Override 
protected void reduce(LongWritable key, Iterable<Text> items, Context context) 
     throws IOException, InterruptedException { 

    Iterator<Text> patriciaIterator = items.iterator(); 
    PatriciaTree tree = new PatriciaTree(); 

    if (patriciaIterator.hasNext()){ 
     Text input = patriciaIterator.next(); 
     tree = serializationManager.deserializePatriciaTree(input.toString()); 
    } 

    while(patriciaIterator.hasNext()){ 
     Text input = patriciaIterator.next(); 
     PatriciaTree mergeableTree = serializationManager.deserializePatriciaTree(input.toString()); 
     tree = treeManager.mergeTree(tree, mergeableTree, false); 
    } 

    Text outputValue = new Text(serializationManager.serializeAsJson(tree)); 
    mos.write("key", key, outputValue, generateOutputPath(key)); 
    context.write(key, outputValue); 
} 

@Override 
protected void finalize() throws Throwable { 
    // TODO Auto-generated method stub 
    super.finalize(); 
    mos.close(); 
} 

private String generateOutputPath(LongWritable key) throws IOException { 
    String outputPath = String.format("%s-%s", MRConstants.reduceResultValue, key.toString()); 
    return outputPath; 
} 

}

我是不是做錯了什麼碼?

+0

這個問題似乎有點偏離主題。你到目前爲止已經嘗試瞭解這個問題?調試或測試特定場景? –

+0

嗯,我注意到結果文件被創建,但它們是空的,雖然結果不是,但是,因爲我使用內置的輸出格式,我只是試圖尋找類似的問題在網絡 –

+0

似乎你已經找到自己解決。不要忘記將你的答案標記爲可接受的解決方案。或者考慮刪除你的問題,如果它是如此具體,其他人不會從中受益。 –

回答

1

我發現我使用錯誤的方法來關閉多個輸出對象。在清理方法中關閉MultipleOutputs而不是最終化方法之後,所有工作都很完美

相關問題