2013-10-09 23 views
0

我想獲得與CDH捆綁在一起的Grep示例來讀取Sequence/Snappy文件。修改grep來解析序列/ Snappy文件

默認情況下,該程序中引發錯誤嘗試讀取序列/斯納皮文件:

java.io.EOFException的:在輸入流 在org.apache.hadoop.io.compress塊的意外結束。 BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:121)
在org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:95)
在org.apache.hadoop.io.compress.DecompressorStream.read( DecompressorStream.java:83)
at java.io.InputStream.read(InputStream.java:82 )

所以我編輯了代碼來讀取序列文件。

更改:

FileInputFormat.setInputPaths(grepJob, args[0]); 

要:

FileInputFormat.setInputPaths(grepJob, args[0]); 
    grepJob.setInputFormatClass(SequenceFileAsTextInputFormat.class); 

但我仍然得到同樣的錯誤。

1)我是否需要手動設置輸入壓縮編解碼器?我認爲SequenceFile閱讀器會自動檢測壓縮。
2)如果我需要手動設置壓縮,我使用「setInputFormatClass」來做,還是我在「conf」對象中設置的東西?

回答

0

得到我的代碼工作。我有點困惑,但我不需要在代碼中的任何位置指定壓縮編解碼器。這裏是原始代碼:

package org.myorg; 

import java.util.Random; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.conf.Configured; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.*; 
import org.apache.hadoop.mapreduce.lib.input.*; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.map.InverseMapper; 
import org.apache.hadoop.mapreduce.lib.map.RegexMapper; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer; 
import org.apache.hadoop.util.Tool; 
import org.apache.hadoop.util.ToolRunner; 
import org.apache.hadoop.io.SequenceFile.CompressionType; 
import org.apache.hadoop.io.SequenceFile.Metadata; 
import org.apache.hadoop.io.compress.*; 


/* Extracts matching regexs from input files and counts them. */ 
public class Grep extends Configured implements Tool { 
private Grep() {} // singleton 

public int run(String[] args) throws Exception { 
if (args.length < 3) { 
System.out.println("Grep <inDir> <outDir> <regex> [<group>]"); 
ToolRunner.printGenericCommandUsage(System.out); 
return 2; 
} 

Path tempDir = 
new Path("grep-temp-"+ 
Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); 

Configuration conf = getConf(); 
conf.set(RegexMapper.PATTERN, args[2]); 
if (args.length == 4) 
conf.set(RegexMapper.GROUP, args[3]); 


Job grepJob = new Job(conf); 

try { 

grepJob.setJobName("grep-search"); 

FileInputFormat.setInputPaths(grepJob, args[0]); 
grepJob.setInputFormatClass(SequenceFileAsTextInputFormat.class); 

grepJob.setMapperClass(RegexMapper.class); 

grepJob.setCombinerClass(LongSumReducer.class); 
grepJob.setReducerClass(LongSumReducer.class); 
FileOutputFormat.setOutputPath(grepJob, tempDir); 
grepJob.setOutputFormatClass(SequenceFileOutputFormat.class); 
grepJob.setOutputKeyClass(Text.class); 
grepJob.setOutputValueClass(LongWritable.class); 

grepJob.waitForCompletion(true); 

Job sortJob = new Job(conf); 
sortJob.setJobName("grep-sort"); 

FileInputFormat.setInputPaths(sortJob, tempDir); 
sortJob.setInputFormatClass(SequenceFileInputFormat.class); 

sortJob.setMapperClass(InverseMapper.class); 

sortJob.setNumReduceTasks(1); // write a single file 
FileOutputFormat.setOutputPath(sortJob, new Path(args[1])); 
sortJob.setSortComparatorClass(// sort by decreasing freq 
LongWritable.DecreasingComparator.class); 

sortJob.waitForCompletion(true); 
} 
finally { 
FileSystem.get(conf).delete(tempDir, true); 
} 
return 0; 
} 

public static void main(String[] args) throws Exception { 
int res = ToolRunner.run(new Configuration(), new Grep(), args); 
System.exit(res); 
} 

} 

我很困惑,因爲在一個簡單的「貓」節目,我必須設置壓縮編解碼器。

package org.myorg; 
import java.io.*; 
import java.util.*; 
import java.net.*; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.*; 
import org.apache.hadoop.util.*; 
import org.apache.hadoop.io.SequenceFile.CompressionType; 
import org.apache.hadoop.io.SequenceFile.Metadata; 
import org.apache.hadoop.io.compress.*; 
import java.nio.charset.*; 

public class seqcat { 

public static void main(String[] args) throws IOException { 
    String uri = args[0]; 
    Configuration conf = new Configuration(); 
    FileSystem fs = FileSystem.getLocal(conf); 
    Path seqFilePath = new Path(uri); 
    SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(seqFilePath)); 

    Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); 
    Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); 

    CompressionCodecFactory ccf = new CompressionCodecFactory(conf); 
    CompressionCodec codec = ccf.getCodecByClassName(DefaultCodec.class.getName()); 

    while (reader.next(key, value)) { 
       byte[] strBytes = ((BytesWritable) value).getBytes(); 
       int len = ((BytesWritable) value).getLength(); 
       String val = new String(strBytes,0,len,Charset.forName("UTF-8")); 
       System.out.println(val); 
      } 

    reader.close(); 
    } 
}