2015-01-05 61 views
2

我可以使用下面的代碼在兩個RDD中打印數據。比較火花中兩個RDD中的數據

usersRDD.foreach(println) 
empRDD.foreach(println) 

我需要比較兩個RDD中的數據。如何迭代和比較一個RDD中的字段數據與另一個RDD中的字段數據。例如:重複記錄並檢查userRDD中的姓名和年齡在empRDD中是否有匹配記錄,如果沒有放入單獨的RDD。

我試過userRDD.substract(empRDD),但是它比較了所有的字段。

回答

5

您需要在每個RDD中鍵入數據,以便有些事情可以加入記錄。例如,看看groupBy。然後你生成RDD。對於每個鍵,您都會得到兩個匹配的值。如果你有興趣在尋找無與倫比的鑰匙,使用leftOuterJoin,像這樣:

// Returns the entries in userRDD that have no corresponding key in empRDD. 
def nonEmp(userRDD: RDD[(String, String)], empRDD: RDD[(String, String)]) = { 
    userRDD.leftOuterJoin(empRDD).collect { 
    case (name, (age, None)) => name -> age 
    } 
} 
+1

聽起來像OP可能有興趣找到存在於一個RDD中並從另一個RDD中丟失的密鑰。爲此,您需要'leftOuterJoin'而不是'join'。在答案中可能值得一提。 –

+0

謝謝Sean,但我需要從兩個RDD中找到不匹配的數據。如果您提供示例代碼,這將非常棒。 – Ramakrishna

+0

啊謝謝@DanielDarabos,這是正確的答案。我誤解了原文。 –

1

當然,上述解決方案是完整和正確的!只有一個建議,當且僅當RDD同步(相同的行具有相同的密鑰)。您可以使用分佈式解決方案,並通過通過以下測試的解決方案只用火花轉化利用並行:

def distrCompare(left: RDD[(Int,Int)], right: RDD[(Int,Int)]): Boolean = { 
    val rdd1 = left.join(right).map{case(k, (lv,rv)) => (k,lv-rv)} 
    val rdd2 = rdd1.filter{case(k,v)=>(v!=0)} 
    var equal = true; 
    rdd2.map{ 
    case(k,v)=> if(v!=0) equal = false 
    } 
    return equal 
} 

您可以選擇分區的「加盟」的數量。