我使用DirectKafkaStream
API 1從卡夫卡讀取數據,進行一些轉換,更新計數,然後將數據寫回卡夫卡。其實這個代碼和平是在測試:Spark DStream使用轉換週期性地調用saveAsObjectFile不能按預期工作
kafkaStream[Key, Value]("test")
.map(record => (record.key(), 1))
.updateStateByKey[Int](
(numbers: Seq[Int], state: Option[Int]) =>
state match {
case Some(s) => Some(s + numbers.length)
case _ => Some(numbers.length)
}
)
.checkpoint(this)("count") {
case (save: (Key, Int), current: (Key, Int)) =>
(save._1, save._2 + current._2)
}
.map(_._2)
.reduce(_ + _)
.map(count => (new Key, new Result[Long](count.toLong)))
.toKafka(Key.Serializer.getClass.getName, Result.longKafkaSerializer.getClass.getName)
的checkpoint
運算符是一個濃縮的DStream
API我已經創建,應該使用saveAsObjectFile
幾乎保存給出一個Time
的DStream
的一個RDD到HDFS。實際上,它將每第60個微量批次(RDD)的結果保存到HDFS中。
檢查站執行以下操作:
def checkpoint(processor: Streaming)(name: String)(
mergeStates: (T, T) => T): DStream[T] = {
val path = processor.configuration.get[String](
"processing.spark.streaming.checkpoint-directory-prefix") + "/" +
Reflection.canonical(processor.getClass) + "/" + name + "/"
logInfo(s"Checkpoint base path is [$path].")
processor.registerOperator(name)
if (processor.fromCheckpoint && processor.restorationPoint.isDefined) {
val restorePath = path + processor.restorationPoint.get.ID.stringify
logInfo(s"Restoring from path [$restorePath].")
checkpointData = context.objectFile[T](restorePath).cache()
stream
.transform((rdd: RDD[T], time: Time) => {
val merged = rdd
.union(checkpointData)
.map[(Boolean, T)](record => (true, record))
.reduceByKey(mergeStates)
.map[T](_._2)
processor.maybeCheckpoint(name, merged, time)
merged
}
)
} else {
stream
.transform((rdd: RDD[T], time: Time) => {
processor.maybeCheckpoint(name, rdd, time)
rdd
})
}
}
的有效的代碼如下:
dstream.transform((rdd: RDD[T], time: Time) => {
processor.maybeCheckpoint(name, rdd, time)
rdd
})
如果上述代碼dstream
變量是先前的操作者,這是updateStateByKey
的結果,因此在updateStateByKey
之後立即調用變換。
def maybeCheckpoint(name: String, rdd: RDD[_], time: Time) = {
if (doCheckpoint(time)) {
logInfo(s"Checkpointing for operator [$name] with RDD ID of [${rdd.id}].")
val newPath = configuration.get[String](
"processing.spark.streaming.checkpoint-directory-prefix") + "/" +
Reflection.canonical(this.getClass) + "/" + name + "/" + checkpointBarcode
logInfo(s"Saving new checkpoint to [$newPath].")
rdd.saveAsObjectFile(newPath)
registerCheckpoint(name, Operator(name), time)
logInfo(s"Checkpoint completed for operator [$name].")
}
}
正如你看到的大部分代碼只是記賬,但saveAsObjectFile
有效地叫。
問題是即使從updateStateByKey
產生的RDD應該自動保留,當每個第X個微批次調用saveAsObjectFile
時,Spark將重新計算從頭開始,從流式作業開始時開始的所有事情通過再次閱讀卡夫卡的一切。我試圖在DStream和RDDs上放置並強制使用不同級別的存儲器,cache
或persist
。
微批次:
DAG作業22:
DAG作業運行saveAsObjectFile
:
可能是什麼問題?
謝謝!
1使用Spark 2.1.0。
這不是'saveAsObjectFile'在這裏是壞男孩,一個簡單的'count'做同樣的事情, – Dyin
通過調用'saveAsObjectFile'告訴spark執行一個動作,它執行初始流上定義的所有轉換(記住:轉化是懶惰的)。之後執行其他操作時(例如'reduce'),將再次執行相同的轉換。爲了防止你可以在第一個動作之前調用'dstream.cache()'(即'checkpoint(this)(...)') –
@AdiGerber我已經試過了,當然了。就在檢查點豐富之前,緩存甚至試圖堅持使用磁盤存儲,以確保只有內存的存儲級別不會強迫RDD被驅逐。無論如何,我只會「重複使用」最後一次緩存的RDD,它的大小隻有4KB - 用少量的數據進行測試 - 基數爲10的300.000個鍵值記錄.10個鍵實際上由'updateStateByKey'存儲。 – Dyin