2017-02-24 58 views
1

我有兩個RDD(K,V),在火花它不允許兩個映射嵌套。檢查如果一個RDD(K,V)V是包含在另一個R dd(K,V)V

val x = sc.parallelize(List((1,"abc"),(2,"efg"))) 
val y = sc.parallelize(List((1,"ab"),(2,"ef"), (3,"tag")) 

如果RDD很大,我想檢查「abc」是否包含「ab」。

+0

您是否可以使用所需的輸出更新您正在查找的問題。 –

+0

謝謝,我想知道「abc」是否包含「ab」,輸出如(abc,efg) – ozil

回答

0

假設你想從RDD X時,它的子選擇一個值出現在RDDŸ那麼這段代碼應該工作。

def main(args: Array[String]): Unit = { 
    val x = spark.sparkContext.parallelize(List((1, "abc"), (2, "efg"))) 
    val y = spark.sparkContext.parallelize(List((1, "ab"), (2, "ef"), (3, "tag"))) 

    // This RDD is filtered. That is we are selecting elements from x only if the substring of the value is present in 
    // the RDD y. 
    val filteredRDD = filterRDD(x, y) 
    // Now we map the filteredRDD to our result list 
    val resultArray = filteredRDD.map(x => x._2).collect() 
} 

def filterRDD(x: RDD[(Int, String)], y: RDD[(Int, String)]): RDD[(Int, String)] = { 
    // Broadcasting the y RDD to all spark nodes, since we are collecting this before hand. 
    // The reason we are collecting the y RDD is to avoid call collect in the filter logic 
    val y_bc = spark.sparkContext.broadcast(y.collect.toSet) 
    x.filter(m => { 
    y_bc.value.exists(n => m._2.contains(n._2)) 
    }) 
} 
+1

謝謝,數據量小時使用廣播。 – ozil

相關問題