2015-10-16 18 views
3

我想用Spark 1.5.1(和Scala 2.10.2)來讀取一些.avro文件HDFS(spark-avro 1.7.7),以便對它們進行一些計算。

現在,我已經搜查徹底的網上找到解決辦法的假設(也是迄今爲止最好的鏈接開始被this one這表明使用GenericRecord,而this one報告了同樣的問題,並this one少了點爲我工作,因爲它提供了幾乎相同的代碼我在這裏問,因爲這可能是有人有相同的。這是代碼:

import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} 
import org.apache.hadoop.io.NullWritable 
import org.apache.spark.{SparkConf, SparkContext} 

object SparkPOC { 

    def main(args: Array[String]): Unit ={ 

    val conf = new SparkConf() 
     .setAppName("SparkPOC") 
     .set("spark.master", "local[4]") 
    val sc = new SparkContext(conf) 
    val path = args(0) 
    val profiles = sc.hadoopFile(
     path, 
     classOf[AvroInputFormat[MyRecord]], 
     classOf[AvroWrapper[MyRecord]], 
     classOf[NullWritable] 
    ) 

    val timeStamps = profiles.map{ p => p._1.datum.getTimeStamp().toString} 
    timeStamps.foreach(print) 

} 

而且我得到以下信息:

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to packagename.MyRecord 
    at packagename.SparkPOC$$anonfun$1.apply(SparkPOC.scala:24) 
    at packagename.SparkPOC$$anonfun$1.apply(SparkPOC.scala:24) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:744) 

是否有人有線索?我也在考慮使用spark-avro的可能性,但它們不支持同時從多個文件讀取(而.hadoopFile支持通配符)。否則,似乎我必須去GenericRecord並使用.get方法,從而失去了編碼模式(MyRecord)的優勢。

在此先感謝。

回答

0

後,我已經設置KryoSerializer和spark.kryo.registrator類的問題已經沒有了,如下:

val config = new SparkConf() 
    .setAppName(appName) 
    .set("spark.master", master) 
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    .set("spark.kryo.registrator", "com.mypackage.AvroKryoRegistrator") 

其中AvroKryoRegistrator是類似於this

2

我通常把它讀作GenericRecord和顯式轉換爲必要,即

val conf = sc.hadoopConfiguration 
sc.newAPIHadoopFile(path, classOf[AvroKeyInputFormat[GenericRecord]], classOf[AvroKey[GenericRecord]], classOf[NullWritable], conf).map(_._1.datum().asInstanceOf[MyRecord]) 
+0

非常感謝您的回答。但是,在使用KryoSerializer並在spark上下文中設置spark.kryo.registrator後,問題消失了。我不完全知道原因 - 可能是我正在使用的某個庫中的錯誤,或者...... boh(?),但它現在可用。 – Markon

相關問題