2011-10-10 60 views
0

下面是這種情況Hadoop的流:寫輸出到不同的文件

  Reducer1 
     / 
Mapper - - Reducer2 
     \ 
      ReducerN 

在減速,我想寫在不同文件中的數據,可以說減速看起來像

def reduce(): 
    for line in sys.STDIN: 
    if(line == type1): 
     create_type_1_file(line) 
    if(line == type2): 
     create_type_2_file(line) 
    if(line == type3): 
     create_type3_file(line) 
     ... and so on 
def create_type_1_file(line): 
    # writes to file1 
def create_type2_file(line): 
    # writes to file2 
def create_type_3_file(line): 
    # write to file 3 

考慮路徑寫爲:

file1 = /home/user/data/file1 
file2 = /home/user/data/file2 
file3 = /home/user/data/file3 

當我在pseudo-distributed mode(machine with one node and hdfs daemons running)運行,事情是因爲所有的d好aemons將寫入同一組文件

問題: - 如果我在1000臺機器的羣集中運行此操作,它們是否會寫入同一組文件?我在這種情況下是writing to local filesystem
- 在hadoop streaming有沒有更好的方法來執行此操作?

謝謝

+0

這個答案可能會幫助(不知道的,因此評論

Job job = new Job(); FileInputFormat.setInputPath(job, inDir); //outDir is the root path, in this case, outDir="/home/user/data/" FileOutputFormat.setOutputPath(job, outDir); //You have to assign the output formatclass.Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000. To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); instead of job.setOutputFormatClass(TextOutputFormat.class); in your Hadoop job configuration. LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(MOMap.class); job.setReducerClass(MOReduce.class); ... job.waitForCompletion(true); 

在減速用法)http://stackoverflow.com/questions/162 6786 /生成-分離 - 輸出 - 文件功能於Hadoop的流/ 1690092#1690092 – Nija

回答

0

通常減少的O/P被寫入像HDFS一個可靠的存儲系統,因爲如果其中一個節點出現故障則與該節點丟失有關的減少數據。在Hadoop框架的上下文之外再次運行特定的reduce任務是不可能的。另外,一旦作業完成,來自1000個節點的o/p必須針對不同的輸入類型進行合併。

HDFS中的並行寫入是not supported。可能存在多個縮減器可能正在寫入HDFS中的同一文件的情況,這可能會破壞文件。當多個reduce任務在單個節點上運行時,同時寫入單個本地文件時可能會出現併發問題。

其中一個解決方案是有reduce task specific file name,後來將所有文件合併爲一個特定的輸入類型。

0

可以使用MultipleOutputs類從Reducer將輸出寫入多個位置。您可以將file1,file2和file3視爲三個文件夾,並分別向這些文件夾寫入1000個Reducers的輸出數據。作業提交


使用模式:

private MultipleOutputs out; 

public void setup(Context context) { 

    out = new MultipleOutputs(context); 

    ... 

} 

public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { 

//'/' characters in baseOutputPath will be translated into directory levels in your file system. Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. No call to context.write() is necessary. 
for (Text line : values) { 

    if(line == type1) 
     out.write(key, new Text(line),"file1/part"); 

    else if(line == type2) 
     out.write(key, new Text(line),"file2/part"); 

else if(line == type3) 
     out.write(key, new Text(line),"file3/part"); 
    } 
} 

protected void cleanup(Context context) throws IOException, InterruptedException { 
     out.close(); 
    } 

裁判:https://hadoop.apache.org/docs/r2.6.3/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html