2015-11-05 53 views
1

我有問題將二進制文件(這是作爲序列文件存儲在Hadoop中)複製到本地機器。問題是我從hdfs下載的二進制文件不是我在運行map-reduce任務時生成的原始二進制文件。我搜索了類似的問題,我想問題是,當我將序列文件複製到本地機器時,我得到了序列文件的頭文件和原始文件。Java - 在Hadoop中下載序列文件

我的問題是:有沒有辦法避免下載的頭,但仍保持我原來的二進制文件?

有兩種方法我能想到的:

  1. 我可以轉換二進制文件到像文本的其他格式,這樣我可以儘量避免使用SequenceFile。在我做copyToLocal之後,我將它轉換回二進制文件。

  2. 我仍然可以使用序列文件。但是當我生成二進制文件時,我還會生成一些關於相應序列文件的元信息(例如,頭文件的長度和文件的原始長度)。在我做copyToLocal之後,我使用下載的二進制文件(包含頭文件等)以及元信息來恢復我的原始二進制文件。

我不知道哪一個是可行的。誰能給我一個解決方案?你能不能給我看一些你給的解決方案的示例代碼?

我非常感謝您的幫助。

+0

如何從SequenceFile下載二進制內容?請發佈您的代碼。 –

回答

1

我找到了解決此問題的方法。由於下載序列文件會給你在二進制文件中的標題和其他魔術字,我避免這個問題的方式是將我的原始二進制文件轉換爲Base64字符串並將其作爲文本存儲在HDFS中,並且當下載編碼的二進制文件時,我解碼它回到我原來的二進制文件。

我知道這將需要額外的時間,但目前我沒有找到任何其他辦法解決這個問題。直接刪除序列文件中的標題和其他魔術字的難點在於Hadoop可能會在我的二進制文件之間插入一些「Sync」字。

如果有人有更好的解決這個問題,我會很高興聽到這一點。 :)

0

使用MapReduce的代碼讀取SequenceFile並使用SequenceFileInputFormat作爲InputFileFormat讀取HDFS序列文件。這會將文件分割爲Key Value對,並且該值只有可用於創建二進制文件的二進制文件內容。

下面的代碼片段,以拆分是由多張圖片,並拆分成單獨的二進制文件,並將其寫入到本地文件系統中的序列文件。

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.BytesWritable; 
import org.apache.hadoop.io.NullWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class CreateOrgFilesFromSeqFile { 

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 

     if (args.length !=2){ 
      System.out.println("Incorrect No of args (" + args.length + "). Expected 2 args: <seqFileInputPath> <outputPath>"); 
      System.exit(-1); 
     } 

     Path seqFileInputPath = new Path(args[0]); 
     Path outputPath = new Path(args[1]); 

     Configuration conf = new Configuration(); 
     Job job = Job.getInstance(conf, "CreateSequenceFile"); 

     job.setJarByClass(M4A6C_CreateOrgFilesFromSeqFile.class); 
     job.setMapperClass(CreateOrgFileFromSeqFileMapper.class); 

     job.setInputFormatClass(SequenceFileInputFormat.class); 

     job.setOutputKeyClass(NullWritable.class); 
     job.setOutputValueClass(Text.class); 

     FileInputFormat.addInputPath(job, seqFileInputPath); 
     FileOutputFormat.setOutputPath(job, outputPath); 

     //Delete the existing output File 
     outputPath.getFileSystem(conf).delete(outputPath, true); 

     System.exit(job.waitForCompletion(true)? 0 : -1); 

    } 

} 

class CreateOrgFileFromSeqFileMapper extends Mapper<Text, BytesWritable, NullWritable, Text>{ 

    @Override 
    public void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException{ 


     Path outputPath = FileOutputFormat.getOutputPath(context); 
     FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); 

     String[] filePathWords = key.toString().split("/"); 
     String fileName = filePathWords[filePathWords.length-1]; 

     System.out.println("outputPath.toString()+ key: " + outputPath.toString() + "/" + fileName + "value length : " + value.getLength()); 

     try(FSDataOutputStream fdos = fs.create(new Path(outputPath.toString() + "/" + fileName));){ 

      fdos.write(value.getBytes(),0,value.getLength()); 
      fdos.flush(); 
     } 

      //System.out.println("value: " + value + ";\t baos.toByteArray().length: " + baos.toByteArray().length); 
      context.write(NullWritable.get(), new Text(outputPath.toString() + "/" + fileName));    
    } 
}