2017-04-27 63 views
1

我想輸出一些特定的記錄在減少部分,這取決於鍵值記錄的值。在Hadoop中的MapReduce可以像使用做spark的java api有類似hadoop的MultipleOutputs/FSDataOutputStream嗎?

public void setup(Context context) throws IOException, InterruptedException { 
    super.setup(context); 
    Configuration conf = context.getConfiguration(); 
    FileSystem fs = FileSystem.get (conf); 
    int taskID = context.getTaskAttemptID().getTaskID().getId(); 
    hdfsOutWriter = fs.create (new Path (fileName + taskID), true); // FSDataOutputStream 
} 
public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException { 
    boolean isSpecificRecord = false; 
    ArrayList <String> valueList = new ArrayList <String>(); 
    for (Text val : value) { 
    String element = val.toString(); 
    if (filterFunction (element)) return; 
    if (specificFunction (element)) isSpecificRecord = true; 
    valueList.add (element); 
    } 
    String returnValue = anyFunction (valueList); 
    String specificInfo = anyFunction2 (valueList); 
    if (isSpecificRecord) hdfsOutWriter.writeBytes (key.toString() + "\t" + specificInfo); 
    context.write (key, new Text (returnValue)); 
} 

我想運行火花集羣上這個過程的代碼,可能引發的Java API做到這一點像上面的代碼?

回答

0

只是一個想法如何模擬:

yoursRDD.mapPartitions(iter => { 
    val fs = FileSystem.get(new Configuration()) 
    val ds = fs.create(new Path("outfileName_" + TaskContext.get.partitionId)) 
    ds.writeBytes("Put yours results") 
    ds.close() 
    iter 
}) 
+0

好招,我會嘗試 – zzzzzzzz