2012-04-19 260 views
0

我試圖從txt文件讀取並寫入HBase。從txt文件讀取並寫入HBase

Job Class 
    Job job = new Job(conf, "HWriterJob"); 
     job.setJarByClass(HWriterJob.class); 
     FileInputFormat.setInputPaths(job, new Path(otherArgs[0])); 

     job.setMapperClass(TokenizerMapper.class); 
     job.setOutputKeyClass(ImmutableBytesWritable.class); 
     job.setOutputValueClass(Put.class); 

     TableMapReduceUtil.initTableReducerJob(table,null,job); 

Mapper Class 
@Override 
    public void map(Text key, Text value, Context context) 
      throws IOException, InterruptedException { 

     String line = value.toString(); 

     StringTokenizer st = new StringTokenizer(line, "|"); 
     String result[] = new String[st.countTokens()]; 
     int i = 0; 
     while (st.hasMoreTokens()) { 
      result[i] = st.nextToken(); 
      i++; 
     } 

     Map<ImmutableBytesWritable,Put> resultSet = writeToHBase(result); 

     for (Map.Entry<ImmutableBytesWritable,Put> entry : resultSet.entrySet()) { 
      context.write(new Text(entry.getValue().getRow()), entry.getValue()); 
     } 
    } 

Reducer Class 

public void reduce(Text key, Iterable<Put> values, Context context) 
      throws IOException, InterruptedException { 

     for (Put val : values) { 
      context.write(key, val); 
     } 
    } 

但我在做同樣的事情時是不成功的。

我收到以下錯誤java.lang.ClassCastException:org.apache.hadoop.io.LongWritable不能轉換到org.apache.hadoop.io.Text

回答

0

顯然MR默認LongWritable作爲MapOutputKeyClass ,在你的情況下應該是文本,因此是錯誤。

嘗試設置job.setMapOutputKeyClass(Text.class)和 還設置job.setMapOutputValueClass適當。

0
  @Job Class 
      Job job = new Job(conf, "HWriterJob"); 
    job.setJarByClass(HWriterJob.class); 
    FileInputFormat.setInputPaths(job, new Path(otherArgs[0])); 

    job.setMapperClass(TokenizerMapper.class); 
    TextInputFormat.setInputPaths(job, new Path(args[0])); 
    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputKeyClass(ImmutableBytesWritable.class); 
     job.setOutputValueClass(Put.class); 
    job.setOutputFormatClass(TableOutputFormat.class); 

    job.setNumReduceTasks(0); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
      @Mapper 
      Map<ImmutableBytesWritable,Put> resultSet = writeToHBase(result); 

    for (Map.Entry<ImmutableBytesWritable,Put> entry : resultSet.entrySet()) { 
     context.write(entry.getKey(), entry.getValue()); 
    }