2017-05-27 47 views
0

我的項目包括ZooKeeper,Kafka和Spark Streaming。問題是zkClient當我嘗試使用Spark Streaming向ZooKeeper寫入Kafka偏移量時,無法序列化。我見過幾個GitHub的項目,如:https://github.com/ippontech/spark-kafka-sourcezkClient不能被Serializabled,sparkstreaming寫kafka偏移量到zookeeper

//save the offsets 

kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(topic, rdd)) 

def saveOffsets(topic: String, rdd: RDD[_]): Unit = { 

    logger.info("Saving offsets to ZooKeeper") 
    val stopwatch = new Stopwatch() 

    val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
    offsetsRanges.foreach(offsetRange => logger.debug(s"Using ${offsetRange}")) 

    val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.fromOffset}").mkString(",") 
    logger.debug(s"Writing offsets to ZooKeeper: ${offsetsRangesStr}") 
    **ZkUtils.updatePersistentPath(zkClient, zkPath, offsetsRangesStr)** 

    logger.info("Done updating offsets in ZooKeeper. Took " + stopwatch) 

} 

正如代碼:kafkaStream.foreachRDD(rdd => offsetsStore.saveOffsets(rdd))將司機private val zkClient = new ZkClient(zkHosts, 30000, 30000,ZKStringSerializer)在對象offsetStore執行,但zkClient無法序列,它是如何工作的?

+0

kafkaStream.foreachRDD(RDD => { rdd.foreachPartition(X => offsetsStore.saveOffsets(RDD)) })這會是好的,但foreachpartition將作爲分區號執行severl次數 –

回答

0

可以定義zkClient@transient lazy val,這意味着它不會驅動器和執行器(這是@transient一部分)之間被序列化,而是將被重新初始化時,在每一個和所述類的每個實例,其包含上述代碼(這是lazy部分)。

你可以閱讀更多關於這個模式在這裏: http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/

+0

ty :)我已經嘗試過這種方式,但失敗了,現在它的工作,所以這意味着初始化zkClient只是在驅動程序中執行? –

+0

這絕對意味着zkClient不會被序列化。 我並不是100%確定遍歷'offsetsetsRanges'的代碼在哪裏運行。 在任何情況下,運行此代碼的每個位置都將初始化一個單獨的zkClient實例。 希望這有助於:-) – Alexey

+0

只有在驅動程序它可以得到的RDD不RDD分區,所以offsetsetsRanges只能在驅動程序中執行,我認爲這就是爲什麼zkClient不需要序列化。 –

相關問題