3
在火花流應用程序中維護應用程序狀態的最佳方法是什麼?如何在Spark Streaming中構建查找映射?
我知道的方法有兩種:
- 使用「聯盟」操作追加到查找RDD和各工會後能持續它。
- 將狀態保存在文件或數據庫中,並將其加載到每批的開始位置。
我的問題是從性能的角度來看哪個更好?另外,有沒有更好的方法來做到這一點?
在火花流應用程序中維護應用程序狀態的最佳方法是什麼?如何在Spark Streaming中構建查找映射?
我知道的方法有兩種:
我的問題是從性能的角度來看哪個更好?另外,有沒有更好的方法來做到這一點?
你真的應該使用mapWithState(spec: StateSpec[K, V, StateType, MappedType])如下:
import org.apache.spark.streaming.{ StreamingContext, Seconds }
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))
// checkpointing is mandatory
ssc.checkpoint("_checkpoints")
val rdd = sc.parallelize(0 to 9).map(n => (n, n % 2 toString))
import org.apache.spark.streaming.dstream.ConstantInputDStream
val sessions = new ConstantInputDStream(ssc, rdd)
import org.apache.spark.streaming.{State, StateSpec, Time}
val updateState = (batchTime: Time, key: Int, value: Option[String], state: State[Int]) => {
println(s">>> batchTime = $batchTime")
println(s">>> key = $key")
println(s">>> value = $value")
println(s">>> state = $state")
val sum = value.getOrElse("").size + state.getOption.getOrElse(0)
state.update(sum)
Some((key, value, sum)) // mapped value
}
val spec = StateSpec.function(updateState)
val mappedStatefulStream = sessions.mapWithState(spec)
mappedStatefulStream.print()
你看看在updateStateByKey?在http://spark.apache.org/docs/latest/streaming-programming-guide.html查找它並嘗試一下這個例子,看看它是否適合你的需求 – ccheneson
是的,我看了一下,但無法弄清楚我怎樣才能在我的情況下使用它的狀態是一個key和value對的映射,其中value是一個用戶對象。現在,我想要更新緩存中的用戶對象的每一個用戶活動流。 – Soumitra