2017-06-13 52 views
1
val file = File.createTempFile("temp", ".avro") 
val schema = new Schema.Parser().parse(st) 
val datumWriter = new GenericDatumWriter[GenericData.Record](schema) 
val dataFileWriter = new DataFileWriter[GenericData.Record](datumWriter) 
dataFileWriter.create(schema , file) 
rdd.foreach(r => { 
    dataFileWriter.append(r) 
}) 
dataFileWriter.close() 

GenericData.Record類型的DStream我想寫到HDFS的Avro的格式,但我得到這個Task Not Serializable錯誤:任務試圖寫類型的RDD時不可序列異常通用的記錄

org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2062) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) 
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 
at KafkaCo$$anonfun$main$3.apply(KafkaCo.scala:217) 
at KafkaCo$$anonfun$main$3.apply(KafkaCo.scala:210) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
at scala.util.Try$.apply(Try.scala:161) 
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.apache.avro.file.DataFileWriter 
Serialization stack: 
- object not serializable (class: org.apache.avro.file.DataFileWriter, value: [email protected]) 
- field (class: KafkaCo$$anonfun$main$3$$anonfun$apply$1, name: dataFileWriter$1, type: class org.apache.avro.file.DataFileWriter) 
- object (class KafkaCo$$anonfun$main$3$$anonfun$apply$1, <function1>) 
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
+1

你想通過將RDD對象寫入Avro文件來實現什麼?你應該在https://github.com/databricks/spark-avro上獲得一個戰利品,它可以讓你使用'df.write.avro(「/ tmp/output」)' –

回答

1

由於lambdas必須分佈在集羣周圍以便運行,因此它們只能引用可序列化的數據,以便它們可以序列化,運送到不同的執行程序進行部署並在那裏執行任務。

什麼你大概可以做的是:

  • 每個分區
  • 創建一個新的文件,並獲得它的句柄
  • 使用mapPartitions(而不是map)方法,並創建一個新的作家使用您爲每個分區創建的寫入器的文件句柄,將分區內的每條消息附加到該文件
  • 確保文件句柄在流完全耗盡時關閉
+0

'這樣的東西將DataFrame直接保存到Avro格式的文件中在這裏,我在哪裏使用map,並且僞代碼會非常有幫助 – JSR29

+0

儘管有效的建議總的來說,我認爲'mapPartitions'和本地對象在這種情況下不會有幫助。注意這個'dataFileWriter'如何綁定到本地文件:'dataFileWriter.create(schema,file)',使用'mapPartitions'我們將在每個執行器上創建一個本地文件。 – maasg

+0

這就是爲什麼我建議傳遞一個文件處理程序以在流完全使用時關閉,而不是當前的操作模式。今天,我希望有時間用一些代碼編輯我的答案。 – stefanobaghino

2

這裏的關鍵是DataFileWriter是一個本地資源(綁定到本地文件),所以序列化它沒有任何意義。

修改代碼以執行諸如mapPartitions之類的操作也無濟於事,因爲這種執行器綁定的方法會將文件寫入執行程序的本地文件系統。

我們需要使用支持星火的分佈式特性,例如實施,https://github.com/databricks/spark-avro

使用該庫:

鑑於一case class代表的一些方案,我們會做:

val structuredRDD = rdd.map(record => recordToSchema(record)) 
val df = structuredRDD.toDF() 
df.write.avro(hdfs_path) 
+0

什麼是recordToSchema – JSR29

+0

您編寫的將記錄格式轉換爲您的案例類的函數。 – maasg

+0

您可以提供任何示例,因爲我沒有在 – JSR29

相關問題