2015-06-17 17 views
0

當我手動運行mapred作業時,它會生成一個有效的avro文件。 avro擴展。但是當我用oozie工作流程寫它時,它會產生一個文本文件,這是一個損壞的avro文件。這是我的工作流程:使用avro輸出的Oozie worflow是一個損壞的avro文件

<workflow-app name='sample-wf' xmlns="uri:oozie:workflow:0.2"> 
 
<start to='start_here'/> 
 
<action name='start_here'> 
 
    <map-reduce> 
 
     <job-tracker>${jobTracker}</job-tracker> 
 
     <name-node>${nameNode}</name-node> 
 
     <prepare> 
 
      <delete path="${nameNode}/user/hadoop/${workFlowRoot}/final-output-data"/> 
 
     </prepare> 
 
     <configuration> 
 

 
      <property> 
 
       <name>mapred.job.queue.name</name> 
 
       <value>${queueName}</value> 
 
      </property> 
 
      <property> 
 
        <name>mapred.reducer.new-api</name> 
 
        <value>true</value> 
 
       </property> 
 
       <property> 
 
        <name>mapred.mapper.new-api</name> 
 
        <value>true</value> 
 
       </property> 
 
      <property> 
 
       <name>mapred.input.dir</name> 
 
       <value>/user/hadoop/${workFlowRoot}/input-data</value> 
 
      </property> 
 
      <property> 
 
       <name>mapred.output.dir</name> 
 
       <value>/user/hadoop/${workFlowRoot}/final-output-data</value> 
 
      </property> 
 

 

 
      <property> 
 
       <name>mapreduce.mapper.class</name> 
 
       <value>org.apache.avro.mapred.HadoopMapper</value> 
 
      </property> 
 
      <property> 
 
       <name>mapreduce.reducer.class</name> 
 
       <value>org.apache.avro.mapred.HadoopReducer</value> 
 
      </property> 
 
      <property> 
 
       <name>avro.mapper</name> 
 
       <value>com.flipkart.flap.data.batch.mapred.TestAvro$CFDetectionMapper</value> 
 
      </property> 
 
      <property> 
 
       <name>avro.reducer</name> 
 
       <value>com.flipkart.flap.data.batch.mapred.TestAvro$CFDetectionReducer</value> 
 
      </property> 
 
      <property> 
 
       <name>mapreduce.input.format.class</name> 
 
       <value>org.apache.avro.mapreduce.AvroKeyInputFormat</value> 
 
      </property> 
 
      <property> 
 
       <name>avro.schema.input.key</name> 
 
       <value>{... schema ...}</value> 
 
      </property> 
 
      
 
      <property> 
 
       <name>mapreduce.mapoutput.key.class</name> 
 
       <value>org.apache.hadoop.io.AvroKey</value> 
 
      </property> 
 
      <property> 
 
       <name>avro.map.output.schema.key</name> 
 
       <value>{... schema ...}</value> 
 
      </property> 
 

 
      
 
      <property> 
 
       <name>mapreduce.mapoutput.value.class</name> 
 
       <value>org.apache.hadoop.io.Text</value> 
 
      </property> 
 
      <property> 
 
       <name>mapreduce.output.format.class</name> 
 
       <value>org.apache.avro.mapred.AvroKeyValueOutputFormat</value> 
 
      </property> 
 
      <property> 
 
       <name>mapreduce.output.key.class</name> 
 
       <value>org.apache.avro.mapred.AvroKey</value> 
 
      </property> 
 

 
      <property> 
 
       <name>mapreduce.output.value.class</name> 
 
       <value>org.apache.avro.mapred.AvroValue</value> 
 
      </property> 
 
      
 
      
 
      <property> 
 
       <name>avro.schema.output.key</name> 
 
       <value>{ .... schema .... }</value> 
 
      </property> 
 
      <property> 
 
       <name>avro.schema.output.value</name> 
 
       <value>"string"</value> 
 
      </property> 
 
      <property> 
 
       <name>mapreduce.output.key.comparator.class</name> 
 
       <value>org.apache.avro.mapred.AvroKeyComparator</value> 
 
      </property> 
 
      <property> 
 
       <name>io.serializations</name> 
 
       <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.avro.mapred.AvroSerialization 
 
       </value> 
 
      </property> 
 
     </configuration> 
 
    </map-reduce> 
 
    <ok to='end'/> 
 
    <error to='fail'/> 
 
</action> 
 
<kill name='fail'> 
 
    <message>MapReduce failed, error message[$sf:errorMessage(sf:lastErrorNode())}]</message> 
 
</kill> 
 
<end name='end'/> 
 
</workflow-app>

我的映射器和減速是這樣的定義:

public static class CFDetectionMapper extends 
 
       Mapper<AvroKey<AdClickFraudSignalsEntity>, NullWritable, AvroKey<AdClickFraudSignalsEntity>, Text> { 
 

 
} 
 

 
public static class CFDetectionReducer extends 
 
       Reducer<AvroKey<AdClickFraudSignalsEntity>, Text, AvroKey<AdClickFraudSignalsEntity>, AvroValue<CharSequence>> 
 
     
 
     

能否請你告訴我什麼是錯在這裏?

回答

0

您使用的是一些錯誤的屬性名稱:

<property> 
    <name>mapreduce.mapoutput.key.class</name> 
    <value>org.apache.hadoop.io.AvroKey</value> 
</property> 
[...] 
<property> 
    <name>mapreduce.mapoutput.value.class</name> 
    <value>org.apache.hadoop.io.Text</value> 
</property> 

應該是:

<property> 
    <name>mapreduce.map.output.key.class</name> 
    <value>org.apache.hadoop.io.AvroKey</value> 
</property> 
[...] 
<property> 
    <name>mapreduce.map.output.value.class</name> 
    <value>org.apache.hadoop.io.Text</value> 
</property> 

(注意加點)。對於較早的地圖名稱,這種情況有所不同,但現在按照這種方式 - 請參閱http://hadoop.apache.org/docs/r2.5.2/hadoop-project-dist/hadoop-common/DeprecatedProperties.html