2014-07-16 51 views
8

這是一個用scala編寫的spark spark程序。它每隔1秒計算一次套接字中的字數。結果將是單詞計數,例如,從0到1的單詞計數,然後是從1到2的單詞計數。但是我想知道是否有某種方法可以改變這個程序,以便我們可以累計字數?也就是說,從0到現在爲止的字數。Spark Streaming累計字數

val sparkConf = new SparkConf().setAppName("NetworkWordCount") 
val ssc = new StreamingContext(sparkConf, Seconds(1)) 

// Create a socket stream on target ip:port and count the 
// words in input stream of \n delimited text (eg. generated by 'nc') 
// Note that no duplication in storage level only for running locally. 
// Replication necessary in distributed scenario for fault tolerance. 
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) 
val words = lines.flatMap(_.split(" ")) 
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) 
wordCounts.print() 
ssc.start() 
ssc.awaitTermination() 

回答

9

爲此,您可以使用StateDStream。有一個example of stateful word count from sparks examples

object StatefulNetworkWordCount { 
    def main(args: Array[String]) { 
    if (args.length < 2) { 
     System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>") 
     System.exit(1) 
    } 

    StreamingExamples.setStreamingLogLevels() 

    val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
     val currentCount = values.foldLeft(0)(_ + _) 

     val previousCount = state.getOrElse(0) 

     Some(currentCount + previousCount) 
    } 

    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") 
    // Create the context with a 1 second batch size 
    val ssc = new StreamingContext(sparkConf, Seconds(1)) 
    ssc.checkpoint(".") 

    // Create a NetworkInputDStream on target ip:port and count the 
    // words in input stream of \n delimited test (eg. generated by 'nc') 
    val lines = ssc.socketTextStream(args(0), args(1).toInt) 
    val words = lines.flatMap(_.split(" ")) 
    val wordDstream = words.map(x => (x, 1)) 

    // Update the cumulative count using updateStateByKey 
    // This will give a Dstream made of state (which is the cumulative count of the words) 
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) 
    stateDstream.print() 
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

它的工作方式是,你得到一個Seq[T]每個批次,那麼你更新Option[T]這就像一個蓄電池。它的原因是Option是因爲在第一批中它將是None並保持這種狀態,除非它被更新。在這個例子中,計數是一個整數,如果你正在處理大量的數據,你甚至可能需要一個LongBigInt