您可以使用PairDStreamFunctions.combineByKey
:
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.dstream.DStream
/**
* Created by Yuval.Itzchakov on 29/11/2016.
*/
object GroupingDStream {
def main(args: Array[String]): Unit = {
val pairs: DStream[(String, String)] = ???
val numberOfPartitions: Int = ???
val groupedByIds: DStream[(String, List[String])] = pairs.combineByKey[List[String]](
_ => List[String](),
(strings: List[String], s: String) => s +: strings,
(first: List[String], second: List[String]) => first ++ second, new HashPartitioner(numberOfPartitions))
groupedByIds.foreachRDD(rdd => {
rdd.foreach((kvp: (String, List[String])) => {
})
})
}
}
的combineByKey
結果將與第一個元素是關鍵和第二個元素的值的集合的元組。注意爲了簡化示例,我使用了(String, String)
,因爲您沒有提供任何類型。
然後,使用foreach
迭代值列表並按順序處理它們(如果需要)。請注意,如果您需要應用其他轉換,則可以使用DStream.map
並對第二個元素(值列表)進行操作,而不是使用foreachRDD
。
嗨,我感謝你的回答..我可以使用另一個函數,如partionbykey(以避免鍵值對的低效分組)。然後我的上面的代碼可以保證值是串行執行的(即在一個核心每執行人)?或者它應該通過分組。即通過來自鍵值對的值來訪問? – mahdi62
@ mahdi62你爲什麼認爲combineByKey是無效的?它將在一個執行器內部將所有相似的密鑰局部組合在一起,並且只通過導線混合組合結果。 –
該代碼實際上給一個項目的鍵,值對的空列表...我猜組合器應該是(x:String)=> List [String](x), – mahdi62