我真的是新的Spark和Scala,並且我正在使用ReduceByKeyAndWindows來計算kafka消息中的單詞,因爲我需要使用窗口功能。重新啓動ReduceByKeyAndWindows
我的應用程序的目的是當檢測到來自kafka的郵件中包含特定時間的特定單詞的「x」次郵件時發送警報。然後,從頭開始重新開始。
下面的代碼檢測到這個詞,但我不能讓我的應用程序重新啓動。我想如果可能重新啓動ReduceByKeyAndWindows的積累或其他方式來做到這一點。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
object KafKaWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("KafKaWordCount")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("checkpoint")
val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test
val wordCounts =
lines.map(x => (x, 1))
.reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(2), 2)
//if the value from the key (word) exceeds 10 , sent alert and Restart the values
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
使用從尤瓦Itzchakov 第二示例和減少到達的數量爲10至3和7級發送的郵件。
從第二asnwer輸出是
Word: hello reached count: 1
Word: hello reached count: 2
//No print this message, its OK but the next word not start with 1
Word: hello reached count: 4
Word: hello reached count: 5
Word: hello reached count: 6
Word: hello reached count: 7
,我希望
Word: hello reached count: 1
Word: hello reached count: 2
Word: hello reached count: 1
Word: hello reached count: 2
Word: hello reached count: 1
它有點難以理解你想要的東西,但是我得到這樣的印象,你實際上需要一些更通用的函數,比如'updateStateByKey'。 – zero323
謝謝你,我會讀這個, 你可以告訴我一個例子,我可以如何使用該功能? supose,我recibe此: ** **你好我 將累積此 **(你好,1)** ** (你好,2)** ** (你好,3)** 而當,**你好**超過** 5 ** **我會發送提醒和**你好**需要** 0 **並重新開始.. 對不起,我也是這個頁面的新手。再次感謝! –
不是那麼漂亮的例子:http://stackoverflow.com/a/35565682/1560062 – zero323