2015-04-07 48 views
0

我們該如何配置它使用HBase的表作爲源和內彈簧下沉,我打算創建批處理作業withing春XD它採用MapReduce工作,但我的MapReduce工作希望使用Hbase表作爲源和接收器來處理這個hadoop作業。類似的東西TableMapReduceUtil.initTableMapperJob(),TableMapReduceUtil.initTableReducerJob()MapReduce工作沉春XD批處理作業內

<hdp:job>命名空間目前不支持提供輸入/輸出表

回答

0

我可以使用另一種豆這需要Hadoop的工作作爲輸入來解決這個問題並在設置Scan()和源和接收Hbase表之後返回作業。在和範圍=「原型」在我能夠運行相同的MR工作在春季XD多次使用範圍=「工作」,如果沒有這個,你會得到工作處於運行狀態,而不是第一次成功後DEFINE狀態問題跑。

public class InitJobTasklet { 
    private Job job; 


    public void setJob(Object job){ 
    this.job = (Job)job; 
    } 

    public Job getJob() throws IOException { 


     Scan scan = new Scan(); 
     System.out.println("Initializing the hadoop job with hbase tables and scan object... "); 
     TableMapReduceUtil.initTableMapperJob("SourceTable", 
             scan, 
             Mapper.class, 
             Text.class, Result.class, job); 

     TableMapReduceUtil.initTableReducerJob(
       "TargetTable",  // output table 
       Reducer.class,    // reducer class 
       job); 
       job.setNumReduceTasks(1); 


      return job; 
} 

}

春季批處理作業的配置文件:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:hdp="http://www.springframework.org/schema/hadoop" 
xmlns:batch="http://www.springframework.org/schema/batch" 
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd 
    http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"> 


<hdp:job id="mr-hbase-job" 
    output-path="/output" 
    mapper="mapperclass" reducer="reduceclass" 
    map-key="org.apache.hadoop.hbase.io.ImmutableBytesWritable" map-value="org.apache.hadoop.hbase.client.Result" input-format="org.apache.hadoop.hbase.mapreduce.TableInputFormat" output-format="org.apache.hadoop.hbase.mapreduce.TableOutputFormat" jar-by-class="processor class" scope="prototype"> 

</hdp:job> 

<batch:job id="job" > 
       <batch:step id="step1"> 
     <hdp:job-tasklet id="hadoop-tasklet" job="#{initTask.job}" wait-for-completion="true" scope="job"/> 
    </batch:step> 
</batch:job> 



<hdp:configuration id="hadoopConfiguration"> 
    fs.defaultFS=hdfs://localhost:9000 
hadoop.tmp.dir=/home/smunigati/hadoop/temp 
hbase.zookeeper.quorum=localhost 
hbase.zookeeper.property.clientPort=2181 
    </hdp:configuration> 

<hdp:hbase-configuration id="hbaseConfiguration" configuration-ref="hadoopConfiguration"> 
</hdp:hbase-configuration> 


    <bean id="initTask" class="com.somthing.InitJobTasklet" scope="prototype" > 
    <property name="job" ref="mr-hbase-job" /> 
    </bean>