我創建了一個JSON數據,併爲它的Avro的模式:如何從HDFS中檢索Avro數據?
{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 }
和
{ 「類型」: 「記錄」, 「名」: 「twitter_schema」 「namespace」:「com.miguno.avro」,「fields」:[{「012」:「name」:「username」, 「type」:「string」, 「doc」:「 com「},{ 」name「:」tweet「, 」type「:」string「, 」do c「:」用戶的Twitter消息的內容「},{ 」name「:」timestamp「, 」type「:」long「, 」doc「:」Unix epoch time in seconds「}]」doc :」: 「用於存儲的Twitter消息的基本模式」}
我然後將其轉化爲阿夫羅如下:
java -jar ~/avro-tools-1.7.4.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro
將文件上HDFS與此:
hadoop fs -copyFromLocal twitter.avro <path>
做的時候
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable
val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
但是:
avroRDD.first
我面臨以下異常:
org.apache.spark
,然後在星火CLI出具的本人代碼。 SparkException:由於階段失敗導致作業中止: 階段7.0(TID 13)中的任務2.0具有不可序列化的結果: org.apache.avro.mapred.AvroWrapper at org.apache.spark.scheduler.DAGScheduler.org $ apache $ spark $ scheduler $ DAGScheduler $$ failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala:1174) 在 org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.適用(DAGScheduler.scala:1173) 在 scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray .scala:59) 在scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
什麼是它的解決方案?