我想出異常:火花卡夫卡生產序列化
錯誤yarn.ApplicationMaster:用戶類拋出異常: org.apache.spark.SparkException:任務不能序列 org.apache.spark.SparkException :任務不可序列化在 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:889)at org.apache。 spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:888)at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:147) at org.apache.spark。 rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)at org.apache.spark.rdd.RDD.foreach(RDD.scala :888)at com.Boot $ .test(Boot.scala:60)at com.Boot $ .main(Boot.scala:36)at com.Boot.main(Boot.scala)at sun.reflect。 NativeMethodAccessorImpl.invoke0(原生方法),地址爲 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:606)在 org.apache.spark.deploy.yarn.ApplicationMaster $$ anon $ 2.run(ApplicationMaster.scala:525) 引起:java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer序列化堆棧: - object not serializable(class:org.apache.kafka.clients.producer.KafkaProducer,value: [email protected]) - field(class:com.Boot $$ anonfun $ test $ 1,name:producer $ 1,類型:class org.apache.kafka.clients.producer.KafkaProducer) - 對象(類com.Boot $$ anonfun $ test $ 1,)在org.apache.spark.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala:47) 在 org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) 在 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:301)
// @transient
val sparkConf = new SparkConf()
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// @transient
val sc = new SparkContext(sparkConf)
val requestSet: RDD[String] = sc.textFile(s"hdfs:/user/bigdata/ADVERTISE-IMPRESSION-STAT*/*")
// @transient
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, NearLineConfig.kafka_brokers)
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "49152")
// @transient
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
requestSet.foreachPartition((partisions: Iterator[String]) => {
partisions.foreach((line: String) => {
try {
producer.send(new ProducerRecord[String, String]("testtopic", line))
} catch {
case ex: Exception => {
log.warn(ex.getMessage, ex)
}
}
})
})
producer.close()
在這個節目,我嘗試從HDFS路徑讀取記錄,並將其保存到卡夫卡。 問題是當我刪除有關向卡夫卡發送記錄的代碼時,它運行良好。 我錯過了什麼?
非常感謝。我用你的方式來改變我的代碼。有用 。 –
@ Steven.Prgm他提到了什麼? –
順便說一下,我的一位同事提到了驅動變量和執行變量,它們之間有什麼區別嗎? –