我的項目包括ZooKeeper,Kafka和Spark Streaming。問題是zkClient
當我嘗試使用Spark Streaming向ZooKeeper寫入Kafka偏移量時,無法序列化。我見過幾個GitHub的項目,如:https://github.com/ippontech/spark-kafka-sourcezkClient不能被Serializabled,sparkstreaming寫kafka偏移量到zookeeper
//save the offsets
kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(topic, rdd))
def saveOffsets(topic: String, rdd: RDD[_]): Unit = {
logger.info("Saving offsets to ZooKeeper")
val stopwatch = new Stopwatch()
val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetsRanges.foreach(offsetRange => logger.debug(s"Using ${offsetRange}"))
val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}").mkString(",")
logger.debug(s"Writing offsets to ZooKeeper: ${offsetsRangesStr}")
**ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr)**
logger.info("Done updating offsets in ZooKeeper. Took " + stopwatch)
}
正如代碼:kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(rdd))
將司機private val zkClient = new ZkClient(zkHosts, 30000, 30000,ZKStringSerializer)
在對象offsetStore
執行,但zkClient
無法序列,它是如何工作的?
kafkaStream.foreachRDD(RDD => { rdd.foreachPartition(X => offsetsStore.saveOffsets(RDD)) })這會是好的,但foreachpartition將作爲分區號執行severl次數 –