2013-07-30 81 views
3

我是Hadoop的新手,但這是我上個月的一個學習項目。Hadoop 1輸入文件= 1輸出文件,僅限地圖

在試圖保持這種含糊不清的,是有用的人,讓我扔出去的基本目標第一....假設:

  1. 你有一個大的數據集(明顯),數以百萬計基本的ASCII文本文件。
    • 每個文件都是「記錄」。
  2. 的記錄被存儲在一個目錄結構,以確定客戶&日期
    • 例如/用戶/ hduser /數據/ customer1表/ YYYY-MM-DD,/用戶/ hduser /數據/的customer2/YYYY-MM-DD
  3. 你想模仿輸入結構的輸出結構
    • 例如/用戶/ hduser /出/ customer1表/ YYYY-MM-DD,/用戶/ hduser /出/的customer2/YYYY-MM-DD

我已經看過多線程:

還有更多..我也一直在閱讀湯姆懷特的Hadoop書。我一直在努力學習這一點。而且我經常在新API和舊API之間交換,這增加了嘗試學習這一點的困惑。

許多人指出MultipleOutputs(或舊的api版本),但我似乎無法產生我想要的輸出 - 例如,MultipleOutputs似乎不接受「/」來創建目錄結構寫()

需要採取哪些步驟來創建具有所需輸出結構的文件? 目前,我有一個WholeFileInputFormat類,以及相關RecordReader具有(NullWritable K,ByteWritable V)對(如果需要的話,可以改變)

我的地圖設置:

public class MapClass extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> { 
    private Text filenameKey; 
    private MultipleOutputs<NullWritable, Text> mos; 

    @Override 
    protected void setup(Context context) throws IOException, InterruptedException { 
     InputSplit split = context.getInputSplit(); 
     Path path = ((FileSplit) split).getPath(); 
     filenameKey = new Text(path.toString().substring(38)); // bad hackjob, until i figure out a better way.. removes hdfs://master:port/user/hduser/path/ 
     mos = new MultipleOutputs(context); 
    } 
} 

還有一個清理()函數調用mos.close()圖()功能是目前未知的(我需要幫助這裏)

這是足夠的信息指出一個新手在答案的方向?我的下一個想法是在每個map()任務中創建一個MultipleOutputs()對象,每個對象都有一個新的baseoutput字符串,但我不確定它是否有效,甚至是正確的操作。

建議將不勝感激,程序中的任何內容都可以改變,除了輸入 - 我只是想學習框架 - 但我想盡可能接近這個結果(稍後我可能會考慮將記錄結合到更大的文件,但它們已經是每個記錄20MB,並且我想確保它在我無法在記事本中讀取之前能夠正常工作。

編輯:可以通過修改/擴展TextOutputFormat.class?似乎它可能有一些方法可以工作,但我不確定哪些方法我需要重寫...

+0

我還沒有嘗試過,但書「的Hadoop權威指南」說,從最新的API中MultipleOutputs支持使用文件路徑分隔符(/)。你是說它不起作用嗎? – Rags

+0

@Rags這可能是我執行MultipleOutputs時的一個錯誤 – Pseudo

回答

5

如果您關閉投機執行,那麼我沒有什麼能夠阻止你在你的映射器中手動創建輸出文件夾結構/文件,並向它們寫入記錄(忽略輸出上下文/收集器)

例如,擴展片段(setup方法)這(這基本上是什麼多個輸出是幹什麼的,但假設推測執行被關閉,以避免在兩個地圖的任務都試圖寫入同一個輸出文件的文件衝突):

import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

public class MultiOutputsMapper extends 
     Mapper<LongWritable, Text, NullWritable, NullWritable> { 
    protected String filenameKey; 
    private RecordWriter<Text, Text> writer; 
    private Text outputValue; 
    private Text outputKey; 

    @Override 
    protected void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException { 
     // operate on the input record 
     // ... 

     // write to output file using writer rather than context 
     writer.write(outputKey, outputValue); 
    } 

    @Override 
    protected void setup(Context context) throws IOException, 
      InterruptedException { 
     InputSplit split = context.getInputSplit(); 
     Path path = ((FileSplit) split).getPath(); 

     // extract parent folder and filename 
     filenameKey = path.getParent().getName() + "/" + path.getName(); 

     // base output folder 
     final Path baseOutputPath = FileOutputFormat.getOutputPath(context); 
     // output file name 
     final Path outputFilePath = new Path(baseOutputPath, filenameKey); 

     // We need to override the getDefaultWorkFile path to stop the file being created in the _temporary/taskid folder 
     TextOutputFormat<Text, Text> tof = new TextOutputFormat<Text, Text>() { 
      @Override 
      public Path getDefaultWorkFile(TaskAttemptContext context, 
        String extension) throws IOException { 
       return outputFilePath; 
      } 
     }; 

     // create a record writer that will write to the desired output subfolder 
     writer = tof.getRecordWriter(context); 
    } 

    @Override 
    protected void cleanup(Context context) throws IOException, 
      InterruptedException { 
     writer.close(context); 
    } 
} 

考慮幾點:

  • customerx/yyyy-MM-dd路徑文件或文件夾(如果是文件夾,則需要相應修改 - 此實現假定每個日期有一個文件,文件名爲yyyy-MM-dd)
  • 不妨看看LazyOutputFormat防止空輸出映射文件被創建
+0

我使用了你的骨架並從中學到了很多東西......並且作爲一種學習工具,你的代碼非常優秀** ..你是對的,'yyyy- MM-dd'是另一個文件夾,其中有一個文件。帶了我一些玩,但得到它的工作,其中一個棘手的位是輸入源需要是'/用戶/ hduser /數據/ *'(與明星),因爲它將任務映射到子目錄中的所有文件。我還在作業配置中實現了'NullOutputFormat'(而不是Lazy),並且在設置時使用'TextOutputFormat'(儘管懶惰是一種方便的格式來了解!)非常感謝Chris的指點! – Pseudo

+0

@Chris,只是澄清,這個問題也不能用MultipleOutputs(new API)解決嗎? (使用WholeFileInputFormat(自定義類,isSplittable爲false,並使用FileSplit的路徑)? – Rags

+0

@Rags,可能但我對嘗試做同樣的事情有一些模糊的記憶,但是在基本輸出路徑中存在路徑分隔符問題。也許這已經在更新的版本中修復了。當然值得一試 –