2013-12-16 83 views
16

我在Spark,我有一個Avro文件的RDD。我現在想要做對RDD一些轉換,並將其保存回爲Avro的文件:Spark:寫Avro文件

val job = new Job(new Configuration()) 
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema)) 

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2)) 
    .saveAsNewAPIHadoopFile(outputPath, 
    classOf[AvroKey[GenericRecord]], 
    classOf[org.apache.hadoop.io.NullWritable], 
    classOf[AvroKeyOutputFormat[GenericRecord]], 
    job.getConfiguration) 

運行此星火抱怨架構$ recordSchema是不可序列。

如果我取消註釋.map調用(並且只有rdd.saveAsNewAPIHadoopFile),則調用成功。

我在這裏做錯了什麼?

有什麼想法?

+0

您能否提供異常堆棧跟蹤? Spark,Hadoop和Avro版本號也可能有用。 – Wildfire

+0

請原諒我的天真。請問在這裏做什麼工作?看起來這是一個地圖減少工作?如果我們使用spark來寫出,爲什麼我們需要map reduce作業? –

回答

2

Spark使用的默認序列化程序是Java序列化。所以對於所有的Java類型,它都會嘗試使用Java序列化進行序列化。 AvroKey不可序列化,所以你會得到錯誤。

您可以在您的自定義序列化(如Avro)中使用KryoSerializer或插件。你可以在這裏閱讀關於序列化的更多信息http://spark-project.org/docs/latest/tuning.html

你也可以用可外部化的東西包裝你的對象。查看例如包含AvroFlumeEvent的SparkFlumeEvent:https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

5

此處的問題與Job中使用的avro.Schema類的非可序列化有關。當您試圖從map函數內的代碼引用架構對象時,會引發異常。

舉例來說,如果你嘗試做如下,您將得到「任務不序列化」例外:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
... 
rdd.map(t => { 
    // reference to the schema object declared outside 
    val record = new GenericData.Record(schema) 
}) 

可以讓一切由剛剛創建模式的新實例工作功能塊內:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.map(t => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 
    val record = new GenericData.Record(innserSchema) 
    ... 
}) 

既然你不喜歡解析爲每次處理記錄的Avro架構,更好的解決方案將是解析劃分級別的模式。以下也可以工作:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.mapPartitions(tuples => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 

    tuples.map(t => { 
    val record = new GenericData.Record(innserSchema) 
    ... 
    // this closure will be bundled together with the outer one 
    // (no serialization issues) 
    }) 
}) 

上面的代碼,只要你提供一種便攜式參考jsonSchema文件中,由於地圖的功能是要由多個遠程執行人被執行的工作原理。它可以是對HDFS中文件的引用,也可以與JAR中的應用程序一起打包(在後一種情況下,您將使用類加載器函數來獲取其內容)。

對於那些誰嘗試使用Avro的星火,發現仍存在一些未解決的編譯問題,你必須使用Maven的POM以下導入:

<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro-mapred</artifactId> 
    <version>1.7.7</version> 
    <classifier>hadoop2</classifier> 
<dependency> 

注意"hadoop2"分類。您可以在https://issues.apache.org/jira/browse/SPARK-3039上跟蹤問題。

+0

當我們的map函數中沒有外部依賴關係時,此方法正常工作。有什麼方法可以使模式序列化? – COSTA