2016-11-14 134 views
1

我使用火花卡夫卡集成0.10和我需要在流聚合的兩個級別:卡夫卡火花流多個聚合

  1. 第一個是每分鐘間隔
  2. 另一種是第15分鐘總結間隔。

另外偏好是累積間隔1分鐘值,然後重置它當第15分鐘結束時的B/C 15的分鐘值應保持。

在不同的滑動窗口上有兩個reduceByKeysByWindow s不起作用,因爲它會給出KafkaConcurrentModification例外。

+1

請包括您現在使用的代碼(https://stackoverflow.com/help/mcve)。 – 2016-11-14 04:50:38

回答

0

tl; dr它似乎工作。請提供一個失敗的例子。

我正在使用Spark 2.0.2(已發佈today)。

我的例子如下(有一些代碼簡潔,刪除):

val ssc = new StreamingContext(sc, Seconds(10)) 
import org.apache.spark.streaming.kafka010._ 

val dstream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    preferredHosts, 
    ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)) 

def reduceFunc(v1: String, v2: String) = s"$v1 + $v2" 
dstream.map { r => 
    println(s"value: ${r.value}") 
    val Array(key, value) = r.value.split("\\s+") 
    println(s">>> key = $key") 
    println(s">>> value = $value") 
    (key, value) 
}.reduceByKeyAndWindow(
    reduceFunc, windowDuration = Seconds(30), slideDuration = Seconds(10)) 
    .print() 

dstream.foreachRDD { rdd => 
    // Get the offset ranges in the RDD 
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
    for (o <- offsetRanges) { 
    println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}") 
    } 
} 

ssc.start 

你會改變什麼,看看您所遇到的異常(S)?

整個項目可用spark-streaming-kafka-direct