1
我有兩個RDD(K,V),在火花它不允許兩個映射嵌套。檢查如果一個RDD(K,V)V是包含在另一個R dd(K,V)V
val x = sc.parallelize(List((1,"abc"),(2,"efg")))
val y = sc.parallelize(List((1,"ab"),(2,"ef"), (3,"tag"))
如果RDD很大,我想檢查「abc」是否包含「ab」。
我有兩個RDD(K,V),在火花它不允許兩個映射嵌套。檢查如果一個RDD(K,V)V是包含在另一個R dd(K,V)V
val x = sc.parallelize(List((1,"abc"),(2,"efg")))
val y = sc.parallelize(List((1,"ab"),(2,"ef"), (3,"tag"))
如果RDD很大,我想檢查「abc」是否包含「ab」。
假設你想從RDD X時,它的子選擇一個值出現在RDDŸ那麼這段代碼應該工作。
def main(args: Array[String]): Unit = {
val x = spark.sparkContext.parallelize(List((1, "abc"), (2, "efg")))
val y = spark.sparkContext.parallelize(List((1, "ab"), (2, "ef"), (3, "tag")))
// This RDD is filtered. That is we are selecting elements from x only if the substring of the value is present in
// the RDD y.
val filteredRDD = filterRDD(x, y)
// Now we map the filteredRDD to our result list
val resultArray = filteredRDD.map(x => x._2).collect()
}
def filterRDD(x: RDD[(Int, String)], y: RDD[(Int, String)]): RDD[(Int, String)] = {
// Broadcasting the y RDD to all spark nodes, since we are collecting this before hand.
// The reason we are collecting the y RDD is to avoid call collect in the filter logic
val y_bc = spark.sparkContext.broadcast(y.collect.toSet)
x.filter(m => {
y_bc.value.exists(n => m._2.contains(n._2))
})
}
謝謝,數據量小時使用廣播。 – ozil
您是否可以使用所需的輸出更新您正在查找的問題。 –
謝謝,我想知道「abc」是否包含「ab」,輸出如(abc,efg) – ozil