2016-12-08 96 views
0

我有(Key,Value)類型的dstream。DStream所有相同的密鑰應按順序處理

mapped2.foreachRDD(rdd => { 
    rdd.foreachPartition(p => { 
    p.foreach(x => { 
    } 
)}) 
}) 

我需要得到保證,使用相同的密鑰的所有項目在一個分區和一個core..so居然有順序處理處理..

如何做到這一點?我可以使用效率低下的GroupBykey嗎?

回答

1

您可以使用PairDStreamFunctions.combineByKey

import org.apache.spark.HashPartitioner 
import org.apache.spark.streaming.dstream.DStream 
/** 
    * Created by Yuval.Itzchakov on 29/11/2016. 
    */ 
object GroupingDStream { 
    def main(args: Array[String]): Unit = { 
    val pairs: DStream[(String, String)] = ??? 
    val numberOfPartitions: Int = ??? 

    val groupedByIds: DStream[(String, List[String])] = pairs.combineByKey[List[String]](
     _ => List[String](), 
     (strings: List[String], s: String) => s +: strings, 
     (first: List[String], second: List[String]) => first ++ second, new HashPartitioner(numberOfPartitions)) 

    groupedByIds.foreachRDD(rdd => { 
     rdd.foreach((kvp: (String, List[String])) => { 

     }) 
    }) 
    } 
} 

combineByKey結果將與第一個元素是關鍵和第二個元素的值的集合的元組。注意爲了簡化示例,我使用了(String, String),因爲您沒有提供任何類型。

然後,使用foreach迭代值列表並按順序處理它們(如果需要)。請注意,如果您需要應用其他轉換,則可以使用DStream.map並對第二個元素(值列表)進行操作,而不是使用foreachRDD

+0

嗨,我感謝你的回答..我可以使用另一個函數,如partionbykey(以避免鍵值對的低效分組)。然後我的上面的代碼可以保證值是串行執行的(即在一個核心每執行人)?或者它應該通過分組。即通過來自鍵值對的值來訪問? – mahdi62

+0

@ mahdi62你爲什麼認爲combineByKey是無效的?它將在一個執行器內部將所有相似的密鑰局部組合在一起,並且只通過導線混合組合結果。 –

+0

該代碼實際上給一個項目的鍵,值對的空列表...我猜組合器應該是(x:String)=> List [String](x), – mahdi62