2014-10-20 85 views
2

我是新來的火花,我正在使用Spark與Kafka流..Spark流緩存和轉換

我的流媒體持續時間是1秒。

假設我得到100條記錄中的第1批和120個記錄在第二批80條記錄第三批

--> {sec 1 1,2,...100} --> {sec 2 1,2..120} --> {sec 3 1,2,..80} 

我申請我的邏輯在第一批和有一個結果=> RESULT1

我想要在處理第二批時使用result1,並將第二批的result1和120記錄的組合結果設爲=> result2

我試圖緩存結果,但無法在2s中獲取緩存result1 有可能嗎?或者在這裏展示如何實現我的目標?

JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, String.class,String.class, StringDecoder.class,StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2()); 

我處理消息並找到1秒結果的單詞。

if(resultCp!=null){ 
       resultCp.print(); 
       result = resultCp.union(words.mapValues(new Sum())); 

      }else{ 
       result = words.mapValues(new Sum()); 
      } 

resultCp = result.cache(); 

當第二批的resultCp不應該是零,但這樣在任何給定的時間,我有特定秒數據本身我想找到累積的結果則返回空值。做任何一個知道如何做到這一點..

我瞭解到,一旦火花流傳輸開始jssc.start()控制不再在我們的結尾它與火花。那麼是否可以將第一批的結果發送到第二批來查找累計值?

任何幫助非常感謝。提前致謝。

回答

1

我認爲你正在尋找updateStateByKey它創建一個新的DStream通過應用一個cummulative功能提供的DStream和一些狀態。 從星火例子包這個例子涵蓋了問題的情況下:

首先,你需要一個更新功能,是以新的價值觀和先前已知值:

val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
    val currentCount = values.sum 

    val previousCount = state.getOrElse(0) 

    Some(currentCount + previousCount) 
} 

該函數用於創建一個Dstream,可以從一個源碼流中累積數據。就像這樣:

// Create a NetworkInputDStream on target ip:port and count the 
// words in input stream of \n delimited test (eg. generated by 'nc') 
val lines = ssc.socketTextStream(args(0), args(1).toInt) 
val words = lines.flatMap(_.split(" ")) 
val wordDstream = words.map(x => (x, 1)) 

// Update the cumulative count using updateStateByKey 
// This will give a Dstream made of state (which is the cumulative count of the words) 
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) 

來源:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

+0

謝謝,我已經整理出來我自己,感謝您的時間:) – mithra 2014-10-20 11:34:22