我可以使用下面的代碼在兩個RDD中打印數據。比較火花中兩個RDD中的數據
usersRDD.foreach(println)
empRDD.foreach(println)
我需要比較兩個RDD中的數據。如何迭代和比較一個RDD中的字段數據與另一個RDD中的字段數據。例如:重複記錄並檢查userRDD
中的姓名和年齡在empRDD
中是否有匹配記錄,如果沒有放入單獨的RDD。
我試過userRDD.substract(empRDD)
,但是它比較了所有的字段。
我可以使用下面的代碼在兩個RDD中打印數據。比較火花中兩個RDD中的數據
usersRDD.foreach(println)
empRDD.foreach(println)
我需要比較兩個RDD中的數據。如何迭代和比較一個RDD中的字段數據與另一個RDD中的字段數據。例如:重複記錄並檢查userRDD
中的姓名和年齡在empRDD
中是否有匹配記錄,如果沒有放入單獨的RDD。
我試過userRDD.substract(empRDD)
,但是它比較了所有的字段。
您需要在每個RDD中鍵入數據,以便有些事情可以加入記錄。例如,看看groupBy
。然後你生成RDD。對於每個鍵,您都會得到兩個匹配的值。如果你有興趣在尋找無與倫比的鑰匙,使用leftOuterJoin
,像這樣:
// Returns the entries in userRDD that have no corresponding key in empRDD.
def nonEmp(userRDD: RDD[(String, String)], empRDD: RDD[(String, String)]) = {
userRDD.leftOuterJoin(empRDD).collect {
case (name, (age, None)) => name -> age
}
}
當然,上述解決方案是完整和正確的!只有一個建議,當且僅當RDD同步(相同的行具有相同的密鑰)。您可以使用分佈式解決方案,並通過通過以下測試的解決方案只用火花轉化利用並行:
def distrCompare(left: RDD[(Int,Int)], right: RDD[(Int,Int)]): Boolean = {
val rdd1 = left.join(right).map{case(k, (lv,rv)) => (k,lv-rv)}
val rdd2 = rdd1.filter{case(k,v)=>(v!=0)}
var equal = true;
rdd2.map{
case(k,v)=> if(v!=0) equal = false
}
return equal
}
您可以選擇分區的「加盟」的數量。
聽起來像OP可能有興趣找到存在於一個RDD中並從另一個RDD中丟失的密鑰。爲此,您需要'leftOuterJoin'而不是'join'。在答案中可能值得一提。 –
謝謝Sean,但我需要從兩個RDD中找到不匹配的數據。如果您提供示例代碼,這將非常棒。 – Ramakrishna
啊謝謝@DanielDarabos,這是正確的答案。我誤解了原文。 –