我想用Spark 1.5.1(和Scala 2.10.2)來讀取一些.avro文件HDFS(spark-avro 1.7.7),以便對它們進行一些計算。
現在,我已經搜查徹底的網上找到解決辦法的假設(也是迄今爲止最好的鏈接開始被this one這表明使用GenericRecord,而this one報告了同樣的問題,並this one少了點爲我工作,因爲它提供了幾乎相同的代碼我在這裏問,因爲這可能是有人有相同的。這是代碼:
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable
import org.apache.spark.{SparkConf, SparkContext}
object SparkPOC {
def main(args: Array[String]): Unit ={
val conf = new SparkConf()
.setAppName("SparkPOC")
.set("spark.master", "local[4]")
val sc = new SparkContext(conf)
val path = args(0)
val profiles = sc.hadoopFile(
path,
classOf[AvroInputFormat[MyRecord]],
classOf[AvroWrapper[MyRecord]],
classOf[NullWritable]
)
val timeStamps = profiles.map{ p => p._1.datum.getTimeStamp().toString}
timeStamps.foreach(print)
}
而且我得到以下信息:
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to packagename.MyRecord
at packagename.SparkPOC$$anonfun$1.apply(SparkPOC.scala:24)
at packagename.SparkPOC$$anonfun$1.apply(SparkPOC.scala:24)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
是否有人有線索?我也在考慮使用spark-avro的可能性,但它們不支持同時從多個文件讀取(而.hadoopFile支持通配符)。否則,似乎我必須去GenericRecord並使用.get方法,從而失去了編碼模式(MyRecord)的優勢。
在此先感謝。
非常感謝您的回答。但是,在使用KryoSerializer並在spark上下文中設置spark.kryo.registrator後,問題消失了。我不完全知道原因 - 可能是我正在使用的某個庫中的錯誤,或者...... boh(?),但它現在可用。 – Markon