2015-05-05 28 views
6

Spark數據流分批處理數據。批量間Spark數據流共享

使用RDD並行處理每個間隔數據,而不在每個間隔之間共享任何數據。

但我的用例需要在間隔之間共享數據。

考慮Network WordCount示例,該示例產生在該間隔中接收的所有單詞的計數。

我會如何產生以下字數?

  • 的話「的Hadoop」和「火花」與之前的間隔相對計數統計

  • 普通字數所有換句話說。

注意:UpdateStateByKey進行有狀態處理,但是這適用於每條記錄而不是特定記錄的函數。

因此,UpdateStateByKey不適合此要求。

更新:

考慮下面的例子

間隔-1

輸入:

Sample Input with Hadoop and Spark on Hadoop 

輸出:

hadoop 2 
sample 1 
input 1 
with 1 
and 1 
spark 1 
on 1 

間隔-2

輸入:

Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark 

輸出:

another 3 
hadoop 1 
spark 2 
and 2 
sample 1 
input 1 
with 1 
on 1 

說明:

第一個區間給出所有單詞的正常字數。

在第二間隔的hadoop發生3次,但輸出應爲1(3-2)

火花發生3次,但輸出應爲2(3-1)

對於所有換句話說它應該給正常的字數。

所以,在處理第二時間間隔的數據,它應具有的hadoop火花

的第一間隔的字計數這是一個簡單的例子與圖。

在實際使用情況下,需要數據共享的字段是RDD元素(RDD)的一部分,需要跟蹤大量的值。

即,在這個例子中,像hadoop和spark關鍵字將近100k個關鍵字被跟蹤。

類似usecases在Apache的風暴:

Distributed caching in storm

Storm TransactionalWords

+2

請澄清「的話‘Hadoop的’相對計數和‘火花’與以前間隔計數」。不要猶豫,正式介紹變量和公式。你也可以舉一個例子。 – huitseeker

+0

篩選出不需要的和updateStateByKey? –

+0

添加示例.. –

回答

8

這可能是由「記憶」最後RDD接收並使用左連接合並與一個流式數據批量。我們利用streamingContext.remember來使流媒體過程產生的RDD能夠保留我們所需的時間。

我們利用了這樣一個事實,即dstream.transform是一個在驅動程序上執行的操作,因此我們可以訪問所有本地對象定義。具體而言,我們希望更新每個批次上所需值的最後一個RDD的可變引用。

可能的一段代碼,使這一想法更加清晰:

// configure the streaming context to remember the RDDs produced 
// choose at least 2x the time of the streaming interval 
ssc.remember(xx Seconds) 

// Initialize the "currentData" with an empty RDD of the expected type 
var currentData: RDD[(String, Int)] = sparkContext.emptyRDD 

// classic word count 
val w1dstream = dstream.map(elem => (elem,1))  
val count = w1dstream.reduceByKey(_ + _)  

// Here's the key to make this work. Look how we update the value of the last RDD after using it. 
val diffCount = count.transform{ rdd => 
       val interestingKeys = Set("hadoop", "spark")    
       val interesting = rdd.filter{case (k,v) => interestingKeys(k)}         
       val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))} 
       currentData = interesting 
       countDiff     
       } 

diffCount.print() 
+0

streamingContext.remember - 請記住它在上次給定的持續時間內生成的RDD(可能是緩存/持久化)。有很多與此StreamContext相關的RDD轉換和操作。因此,很多RDD需要記住.. –

+0

我有興趣瞭解更多有關dstream.transform是在驅動程序上執行的操作的事實。您可以請任何參考.. –

+0

我認爲leftOuterJoin將是昂貴的操作時,數據量巨大。即,如果10萬條記錄和當前數據是分佈在集羣中的10萬條記錄,則爲rdd。 –