2017-03-26 91 views
3

我正在學習Spark並試圖構建一個簡單的流服務。有狀態的流Spark處理

例如,我有一個卡夫卡隊列和一個像words count這樣的Spark工作。這個例子是使用無狀態模式。我想積累字數,所以如果test已經發送了幾次不同的消息,我可以得到它的所有事件的總數。

使用其他類似的例子我StatefulNetworkWordCount一直試圖修改我的卡夫卡流媒體服務

val sc = new SparkContext(sparkConf) 
val ssc = new StreamingContext(sc, Seconds(2)) 

ssc.checkpoint("/tmp/data") 

// Create direct kafka stream with brokers and topics 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

// Get the lines, split them into words, count the words and print 
val lines = messages.map(_._2) 
val words = lines.flatMap(_.split(" ")) 

val wordDstream = words.map(x => (x, 1)) 

// Update the cumulative count using mapWithState 
// This will give a DStream made of state (which is the cumulative count of the words) 
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { 
    val sum = one.getOrElse(0) + state.getOption.getOrElse(0) 
    val output = (word, sum) 
    state.update(sum) 
    output 
} 

val stateDstream = wordDstream.mapWithState(
    StateSpec.function(mappingFunc) /*.initialState(initialRDD)*/) 

stateDstream.print() 

stateDstream.map(s => (s._1, s._2.toString)).foreachRDD(rdd => sc.toRedisZSET(rdd, "word_count", 0)) 

// Start the computation 
ssc.start() 
ssc.awaitTermination() 

我得到了很多錯誤,像

17/03/26 21:33:57 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped 
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable 
org.apache.spark.SparkContext 
Serialization stack: 
    - object not serializable (class: org.apache.spark.SparkContext, value: [email protected]) 
    - field (class: com.DirectKafkaWordCount$$anonfun$main$2, name: sc$1, type: class org.apache.spark.SparkContext) 
    - object (class com.DirectKafkaWordCount$$anonfun$main$2, <function1>) 
    - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) 

雖然無國籍版本工作正常,沒有錯誤

val sc = new SparkContext(sparkConf) 
val ssc = new StreamingContext(sc, Seconds(2)) 

// Create direct kafka stream with brokers and topics 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet) 

// Get the lines, split them into words, count the words and print 
val lines = messages.map(_._2) 
val words = lines.flatMap(_.split(" ")) 
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _).map(s => (s._1, s._2.toString)) 
wordCounts.print() 

wordCounts.foreachRDD(rdd => sc.toRedisZSET(rdd, "word_count", 0)) 

// Start the computation 
ssc.start() 
ssc.awaitTermination() 

現在的問題是如何使流動站有限的字數。

+0

你確實需要檢查點嗎?你可以通過刪除'ssc.checkpoint(「/ tmp/data」)'行來修復它,參見[解釋](https://forums.databricks.com/questions/382/why-is-my-spark-streaming- application-throwing-an.html) – dk14

+2

自[Spark 2.2.0](http://spark.apache.org/news/spark-2-2-released.html)發佈以來,您應該認真考慮將結構化流式處理使用Spark構建[有狀態流處理](http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#arbitrary-stateful-operations)(如[更快的狀態流處理在Apache Spark Streaming]中(https://databricks.com/blog/2016/02/01/faster-stateful-stream-processing-in-apache-spark-streaming.html))。 –

+0

@JacekLaskowski謝謝!我會檢查它。 – cyrillk

回答

1

在這一行:

ssc.checkpoint("/tmp/data") 

您啓用了檢查點,這意味着你的一切:

wordCounts.foreachRDD(rdd => sc.toRedisZSET(rdd, "word_count", 0)) 

必須是序列化的,並且sc本身不是,因爲你可以看到從錯誤信息:

object not serializable (class: org.apache.spark.SparkContext, value: [email protected]) 

刪除檢查點代碼行將對此有所幫助。

另一種方式是連續地computeDStreamRDD或數據直接寫入redis的,是這樣的:

wordCounts.foreachRDD{rdd => 
    rdd.foreachPartition(partition => RedisContext.setZset("word_count", partition, ttl, redisConfig) 
} 

RedisContext是一個序列化對象,不依賴於SparkContext

參見:https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala