2016-03-25 165 views
1

我真的是新的Spark和Scala,並且我正在使用ReduceByKeyAndWindows來計算kafka消息中的單詞,因爲我需要使用窗口功能。重新啓動ReduceByKeyAndWindows

我的應用程序的目的是當檢測到來自kafka的郵件中包含特定時間的特定單詞的「x」次郵件時發送警報。然後,從頭開始重新開始。

下面的代碼檢測到這個詞,但我不能讓我的應用程序重新啓動。我想如果可能重新啓動ReduceByKeyAndWindows的積累或其他方式來做到這一點。

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 

object KafKaWordCount { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[2]").setAppName("KafKaWordCount") 
    val ssc = new StreamingContext(conf, Seconds(2)) 

    ssc.checkpoint("checkpoint") 

    val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test 
    val wordCounts = 
     lines.map(x => (x, 1)) 
      .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(2), 2) 

     //if the value from the key (word) exceeds 10 , sent alert and Restart the values 
     wordCounts.print() 
     ssc.start() 
     ssc.awaitTermination() 
    } 
} 

使用從尤瓦Itzchakov 第二示例和減少到達的數量爲10至3和7級發送的郵件。

從第二asnwer輸出是

Word: hello reached count: 1 
Word: hello reached count: 2 
//No print this message, its OK but the next word not start with 1 
Word: hello reached count: 4 
Word: hello reached count: 5 
Word: hello reached count: 6 
Word: hello reached count: 7 

,我希望

Word: hello reached count: 1 
Word: hello reached count: 2 

Word: hello reached count: 1 
Word: hello reached count: 2 

Word: hello reached count: 1 
+1

它有點難以理解你想要的東西,但是我得到這樣的印象,你實際上需要一些更通用的函數,比如'updateStateByKey'。 – zero323

+0

謝謝你,我會讀這個, 你可以告訴我一個例子,我可以如何使用該功能? supose,我recibe此: ** **你好我 將累積此 **(你好,1)** ** (你好,2)** ** (你好,3)** 而當,**你好**超過** 5 ** **我會發送提醒和**你好**需要** 0 **並重新開始.. 對不起,我也是這個頁面的新手。再次感謝! –

+0

不是那麼漂亮的例子:http://stackoverflow.com/a/35565682/1560062 – zero323

回答

0

如果您使用的Spark 1.6.0及以上版本,您可以使用實驗DStream.mapWithState保持輸出更新你的字數的狀態。一旦達到上限,您可以刪除狀態,並釋放到管道,並與DStream.foreach打印出來:

object KafKaWordCount { 
    def main(args: Array[String]) { 
    val conf = new SparkConf() 
    .setMaster("local[2]") 
    .setAppName("KafKaWordCount") 

    val ssc = new StreamingContext(conf, Seconds(2)) 
    ssc.checkpoint("checkpoint") 

    val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test 
    val stateSpec = StateSpec.function(updateWordCount _) 

    lines.map(x => (x, 1)) 
     .reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(2), 2) 
     .mapWithState(stateSpec) 
     .filter(_.isDefined) 
     .foreachRDD(rdd => 
        rdd.foreach { case (word, count) => 
        println(s"Word: $word reached count: $count") }) 
    ssc.start() 
    ssc.awaitTermination() 
    } 

    def updateWordCount(key: String, 
         value: Option[Int], 
         state: State[(String, Int)]): Option[(String, Int)] = { 
    def updateCountState(count: Int): Option[(String, Int)] = { 
     if (count == 10) { 
     if (state.exists()) state.remove() 
     Some((key, count)) 
     } 
     else { 
     state.update((key, count)) 
     None 
     } 
    } 

    value match { 
     case Some(count) => updateCountState(count) 
     case _ => None 
    } 
    } 
} 

如果沒有,你可以推遲到較慢DStream.updateStateByKey

object KafKaWordCount { 
    def main(args: Array[String]) { 
    val conf = new SparkConf() 
    .setMaster("local[2]") 
    .setAppName("KafKaWordCount") 
    val ssc = new StreamingContext(conf, Seconds(2)) 

    ssc.checkpoint("checkpoint") 

    val lines = ssc.socketTextStream("localhost", 9999) //using NETCAT for test 

    lines.map(x => (x, (x, 1))) 
     .reduceByKeyAndWindow((first: (String, Int), second: (String, Int)) => 
           (first._1, first._2 + second._2), Seconds(60), Seconds(60), 2) 
     .updateStateByKey(updateSeqCount _) 
     .print(1) 

    ssc.start() 
    ssc.awaitTermination() 
    } 

    def updateSeqCount(values: Seq[(String, Int)], 
        state: Option[(String, Int]): Option[(String, Int)] = { 
     if (values.isEmpty) state 
     else { 
      val (word, count) = values.head 
      if (count == 10) { 
      println(s"Key: $word reached count $count!") 
      None 
      } 
      else Some((word, count)) 
     } 
    } 
} 
+0

謝謝你,我會試試 –

+0

如何使用StateSpec?我想重現你的例子,我缺少一些導入? 真的很抱歉我的問題!我真的是新的火花。 –

+0

看到'updateStateByKey'的buttom例子,而不是上面的''StateSpec'是爲1.6.0 –