2012-06-14 78 views
1

我一直在嘗試在oozie上運行Avro map-reduce。我在workflow.xml中指定了mapper和reducer類,並提供了其他配置。但是它給出了一個Avro Map-Reduce on oozie

java.lang.RunTime Exception - class mr.sales.avro.etl.SalesMapper not org.apache.hadoop.mapred.Mapper 

在Hadoop集羣上直接運行(而不是通過Oozie的)被完成,並得到所需的輸出相同的工作。所以我可能會錯過一些oozie配置。我從異常中得出的結論是oozie要求映射器是org.apache.hadoop.mapred.Mapper的子類,但是Avro映射器有不同的簽名 - 它們擴展了org.apache.avro.mapred.AvroMapper,這可能是導致錯誤的原因。

所以我的問題是如何配置oozie工作流程/屬性文件以允許它運行Avro map-reduce作業。

回答

1

隨着AVRO,您將需要配置一些額外的屬性:

  • org.apache.avro.mapred.HadoopMapper是你需要設置的實際映射器類(此實現映射器接口)
  • avro.mapper屬性應該命名您的SalesMapper

組合器和減速器還有其他屬性 - 請檢查AvroJob源代碼和實用程序方法。

這樣做的另一種方法是檢查從您手動提交作業的job.xml,以及在相關配置屬性複製到Oozie的workflow.xml

+0

感謝您的答覆。我可以通過oozie使作業運行,但輸出似乎不是二進制avro格式 - 它看起來像一個文本文件,其中有條目[email protected] 你可以建議如果發生了什麼問題?因爲在我手動提交的工作中,輸出看起來不同。我已經在workflow.xml中指定了輸出文件 –

+0

那麼我錯過了指定output.format參數。它在我指定它時起作用。 –

+0

哈斯哈爾可以請你把一個workflow.xml的樣本謝謝 – Richipal

1

我一直在這個星期有同樣的問題。這裏是我的workflow.xml(修改):

<workflow-app name='sample-wf' xmlns="uri:oozie:workflow:0.2"> 
<start to='start_here'/> 
<action name='start_here'> 
    <map-reduce> 
     <job-tracker>${jobTracker}</job-tracker> 
     <name-node>${nameNode}</name-node> 
     <prepare> 
      <delete path="${nameNode}/user/${wf:user()}/output"/> 
     </prepare> 
     <configuration> 
      <property> 
       <name>mapred.input.dir</name> 
       <value>/user/${wf:user()}/input</value> 
      </property> 
      <property> 
       <name>mapred.output.dir</name> 
       <value>/user/${wf:user()}/output</value> 
      </property> 
      <property> 
       <name>mapred.mapper.class</name> 
       <value>org.apache.avro.mapred.HadoopMapper</value> 
      </property> 
      <property> 
       <name>mapred.reducer.class</name> 
       <value>org.apache.avro.mapred.HadoopReducer</value> 
      </property> 
      <property> 
       <name>avro.mapper</name> 
       <value>package.for.my.Mapper</value> 
      </property> 
      <property> 
       <name>avro.reducer</name> 
       <value>package.for.my.Reducer</value> 
      </property> 
      <property> 
       <name>mapred.input.format.class</name> 
       <value>org.apache.avro.mapred.AvroUtf8InputFormat</value> 
      </property> 
      <property> 
       <name>mapred.output.format.class</name> 
       <value>org.apache.avro.mapred.AvroOutputFormat</value> 
      </property> 
      <property> 
       <name>mapred.output.key.class</name> 
       <value>org.apache.avro.mapred.AvroWrapper</value> 
      </property> 
      <property> 
       <name>mapred.mapoutput.key.class</name> 
       <value>org.apache.avro.mapred.AvroKey</value> 
      </property> 
      <property> 
       <name>mapred.mapoutput.value.class</name> 
       <value>org.apache.avro.mapred.AvroValue</value> 
      </property> 
      <property> 
       <name>avro.map.output.schema</name> 
       <value>{put your schema here from job.xml via manual run}</value> 
      </property> 
      <property> 
       <name>avro.input.schema</name> 
       <value>"string"</value> 
      </property> 
      <property> 
       <name>avro.output.schema</name> 
       <value>{put your schema here from job.xml via manual run}</value> 
      </property> 
      <property> 
       <name>mapred.output.key.comparator.class</name> 
       <value>org.apache.avro.mapred.AvroKeyComparator</value> 
      </property> 
      <property> 
       <name>io.serializations</name> 
       <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.avro.mapred.AvroSerialization</value> 
      </property> 
     </configuration> 
    </map-reduce> 
    <ok to='end'/> 
    <error to='fail'/> 
</action> 
<kill name='fail'> 
    <message>MapReduce failed, error message[$sf:errorMessage(sf:lastErrorNode())}]</message> 
</kill> 
<end name='end'/> 

您可能需要修改這個有點更取決於你的map-reduce任務的輸入和輸出。

0

你可以發佈你的mapper和reducer類嗎?我的oozie工作流程工作正常,但o/p文件不是.avro文件。 這是我的工作流程:

<workflow-app name='sample-wf' xmlns="uri:oozie:workflow:0.2"> 
 
<start to='start_here'/> 
 
<action name='start_here'> 
 
    <map-reduce> 
 
     <job-tracker>${jobTracker}</job-tracker> 
 
     <name-node>${nameNode}</name-node> 
 
     <prepare> 
 
      <delete path="${nameNode}/user/hadoop/${workFlowRoot}/final-output-data"/> 
 
     </prepare> 
 
     <configuration> 
 

 
      <property> 
 
       <name>mapred.job.queue.name</name> 
 
       <value>${queueName}</value> 
 
      </property> 
 
      <property> 
 
        <name>mapred.reducer.new-api</name> 
 
        <value>true</value> 
 
       </property> 
 
       <property> 
 
        <name>mapred.mapper.new-api</name> 
 
        <value>true</value> 
 
       </property> 
 
      <property> 
 
       <name>mapred.input.dir</name> 
 
       <value>/user/hadoop/${workFlowRoot}/input-data</value> 
 
      </property> 
 
      <property> 
 
       <name>mapred.output.dir</name> 
 
       <value>/user/hadoop/${workFlowRoot}/final-output-data</value> 
 
      </property> 
 

 

 
      <property> 
 
       <name>mapreduce.mapper.class</name> 
 
       <value>org.apache.avro.mapred.HadoopMapper</value> 
 
      </property> 
 
      <property> 
 
       <name>mapreduce.reducer.class</name> 
 
       <value>org.apache.avro.mapred.HadoopReducer</value> 
 
      </property> 
 
      <property> 
 
       <name>avro.mapper</name> 
 
       <value>com.flipkart.flap.data.batch.mapred.TestAvro$CFDetectionMapper</value> 
 
      </property> 
 
      <property> 
 
       <name>avro.reducer</name> 
 
       <value>com.flipkart.flap.data.batch.mapred.TestAvro$CFDetectionReducer</value> 
 
      </property> 
 
      <property> 
 
       <name>mapreduce.input.format.class</name> 
 
       <value>org.apache.avro.mapreduce.AvroKeyInputFormat</value> 
 
      </property> 
 
      <property> 
 
       <name>avro.schema.input.key</name> 
 
       <value>{... schema ...}</value> 
 
      </property> 
 
      
 
      <property> 
 
       <name>mapreduce.mapoutput.key.class</name> 
 
       <value>org.apache.hadoop.io.AvroKey</value> 
 
      </property> 
 
      <property> 
 
       <name>avro.map.output.schema.key</name> 
 
       <value>{... schema ...}</value> 
 
      </property> 
 

 
      
 
      <property> 
 
       <name>mapreduce.mapoutput.value.class</name> 
 
       <value>org.apache.hadoop.io.Text</value> 
 
      </property> 
 
      <property> 
 
       <name>mapreduce.output.format.class</name> 
 
       <value>org.apache.avro.mapred.AvroKeyValueOutputFormat</value> 
 
      </property> 
 
      <property> 
 
       <name>mapreduce.output.key.class</name> 
 
       <value>org.apache.avro.mapred.AvroKey</value> 
 
      </property> 
 

 
      <property> 
 
       <name>mapreduce.output.value.class</name> 
 
       <value>org.apache.avro.mapred.AvroValue</value> 
 
      </property> 
 
      
 
      
 
      <property> 
 
       <name>avro.schema.output.key</name> 
 
       <value>{ .... schema .... }</value> 
 
      </property> 
 
      <property> 
 
       <name>avro.schema.output.value</name> 
 
       <value>"string"</value> 
 
      </property> 
 
      <property> 
 
       <name>mapreduce.output.key.comparator.class</name> 
 
       <value>org.apache.avro.mapred.AvroKeyComparator</value> 
 
      </property> 
 
      <property> 
 
       <name>io.serializations</name> 
 
       <value>org.apache.hadoop.io.serializer.WritableSerialization,org.apache.avro.mapred.AvroSerialization 
 
       </value> 
 
      </property> 
 
     </configuration> 
 
    </map-reduce> 
 
    <ok to='end'/> 
 
    <error to='fail'/> 
 
</action> 
 
<kill name='fail'> 
 
    <message>MapReduce failed, error message[$sf:errorMessage(sf:lastErrorNode())}]</message> 
 
</kill> 
 
<end name='end'/> 
 
</workflow-app>

我的映射器和減速是這樣的定義:

public static class CFDetectionMapper extends 
 
       Mapper<AvroKey<AdClickFraudSignalsEntity>, NullWritable, AvroKey<AdClickFraudSignalsEntity>, Text> {} 
 

 
public static class CFDetectionReducer extends 
 
       Reducer<AvroKey<AdClickFraudSignalsEntity>, Text, AvroKey<AdClickFraudSignalsEntity>, AvroValue<CharSequence>>