1
我上運行的火花卡夫卡一個流讀取器火花 - 卡夫卡流異常 - 對象不是serializableConsumerRecord
以下是依賴
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.1</version>
</dependency>
</dependencies>
當一些數據說「嗨---- 3」生產卡夫卡的話題,收到以下異常(我可以看到,雖然在異常數據) -
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = q_metrics, partition = 0, offset = 26, CreateTime = 1480588636828, checksum = 3939660770, serialized key size = -1, serialized value size = 9, key = null, value = "Hi----3"))
我沒有做對任何RDD計算(因爲這也拋出同樣的異常)。即使stream.print()也拋出異常
以下是代碼
import org.apache.spark.streaming._
import org.apache.spark.SparkContext
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.Subscribe
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.rdd.RDD
class Metrics {
def readKafka() {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean))
val topics = Array("q_metrics")
val sc = new SparkContext("local[4]", "ScalaKafkaConsumer")
val streamingContext = new StreamingContext(sc, Seconds(10))
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams))
stream.print()
streamingContext.start
streamingContext.awaitTermination()
}
def rddReader(rdd: Array[String]) = {
}
}
object MetricsReader {
def main(args: Array[String]): Unit = {
val objMetrics = new Metrics()
objMetrics.readKafka()
}
}
得到任何幫助。
感謝
您能接收卡夫卡消費控制檯上的消息? – user4342532
沒有。我將該消息看作是該例外的一部分。 – Raaghu
我認爲你需要添加一些罐子到你的kafka/lib位置,如metrics-core和kafka-clients(如果不存在)。 – user4342532