1
我使用火花卡夫卡集成0.10和我需要在流聚合的兩個級別:卡夫卡火花流多個聚合
- 第一個是每分鐘間隔
- 另一種是第15分鐘總結間隔。
另外偏好是累積間隔1分鐘值,然後重置它當第15分鐘結束時的B/C 15的分鐘值應保持。
在不同的滑動窗口上有兩個reduceByKeysByWindow
s不起作用,因爲它會給出KafkaConcurrentModification
例外。
我使用火花卡夫卡集成0.10和我需要在流聚合的兩個級別:卡夫卡火花流多個聚合
另外偏好是累積間隔1分鐘值,然後重置它當第15分鐘結束時的B/C 15的分鐘值應保持。
在不同的滑動窗口上有兩個reduceByKeysByWindow
s不起作用,因爲它會給出KafkaConcurrentModification
例外。
tl; dr它似乎工作。請提供一個失敗的例子。
我正在使用Spark 2.0.2(已發佈today)。
我的例子如下(有一些代碼簡潔,刪除):
val ssc = new StreamingContext(sc, Seconds(10))
import org.apache.spark.streaming.kafka010._
val dstream = KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))
def reduceFunc(v1: String, v2: String) = s"$v1 + $v2"
dstream.map { r =>
println(s"value: ${r.value}")
val Array(key, value) = r.value.split("\\s+")
println(s">>> key = $key")
println(s">>> value = $value")
(key, value)
}.reduceByKeyAndWindow(
reduceFunc, windowDuration = Seconds(30), slideDuration = Seconds(10))
.print()
dstream.foreachRDD { rdd =>
// Get the offset ranges in the RDD
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}")
}
}
ssc.start
你會改變什麼,看看您所遇到的異常(S)?
整個項目可用spark-streaming-kafka-direct。
請包括您現在使用的代碼(https://stackoverflow.com/help/mcve)。 – 2016-11-14 04:50:38