2017-10-19 151 views
0

我對scala和spark非常陌生,不確定如何啓動。Spark Scala基於另一個RDD的列刪除一個RDD中的行

我有一個RDD,看起來像這樣:

1,2,3,11 
2,1,4,12 
1,4,5,13 
3,5,6,12 

另一個看起來像這樣:

2,1 
1,2 

我要過濾的第一RDD這樣它將刪除那些匹配任何行第二個RDD的前兩列。輸出應該是這樣的:

1,4,5,13 
3,5,6,12 

回答

1
// input rdds 
val rdd1 = spark.sparkContext.makeRDD(Seq((1,2,3,11), (2,1,3,12), (1,4,5,13), (3,5,6,12))) 
val rdd2 = spark.sparkContext.makeRDD(Seq((1,2), (2,1))) 

// manipulate the 2 rdds as a key, val pair 
// the key of the first rdd is a tuple pair of first two fields, the val contains all the fields 
// the key of the second rdd is a tuple of first two fields, the val is just null 
// then we could perform joins on their key 
val rdd1_key = rdd1.map(record => ((record._1, record._2), record)) 
val rdd2_key = rdd2.map(record => (record, null)) 

// 1. perform left outer join, the record become (key, (val1, val2)) 
// 2. filter, keep those records which do not have a join 
// if there is no join, val2 will be None, otherwise val2 will be null, which is the value we hardcoded from previous step 
// 3. get val1 
rdd1_key.leftOuterJoin(rdd2_key) 
    .filter(record => record._2._2 == None) 
    .map(record => record._2._1) 
    .collect().foreach(println(_)) 

// result 
(1,4,5,13) 
(3,5,6,12) 

感謝

1

我個人更喜歡dataframe/dataset方式,因爲它們是優化rdd與更多內置功能和類似於傳統的數據庫形式。

以下是dataframe方式:

第一步將是雙方轉換的rddsdataframes

import sqlContext.implicits._ 
val df1 = rdd1.toDF("col1", "col2", "col3", "col4") 
val df2 = rdd2.toDF("col1", "col2") 

第二步將是在dataframe2添加新column的過濾條件檢查

import org.apache.spark.sql.functions._ 
val tempdf2 = df2.withColumn("check", lit("check")) 

最後一步是join這兩個dataframes,filterdrop不必要的rowscolumns

val finalDF = df1.join(tempdf2, Seq("col1", "col2"), "left") 
          .filter($"check".isNull) 
          .drop($"check") 

你應該有最終dataframe作爲

+----+----+----+----+ 
|col1|col2|col3|col4| 
+----+----+----+----+ 
|3 |5 |6 |12 | 
|1 |4 |5 |13 | 
+----+----+----+----+ 

現在你可以使用finalDF.rdd要麼轉換爲rdd或者你可以用dataframe本身繼續你的進一步處理。我希望答案是有幫助的

相關問題