2017-06-28 35 views
0

我有以下情形:平等幀

我有僅含有1列2個dataframes比方說

DF1=(1,2,3,4,5) 
DF2=(3,6,7,8,9,10) 

基本上那些值是鍵並且如果我創建DF1的鑲木文件DF1中的鍵不在DF2中(在當前的例子中它應該返回false)。我目前的方式達到我的要求是:

val df1count= DF1.count 
val df2count=DF2.count 
val diffDF=DF2.except(DF1) 
val diffCount=diffDF.count 
if(diffCount==(df2count-df1count)) true 
else false 

這種方法的問題是我打電話4次,這當然不是最好的方法。有人可以建議我實現這一目標的最佳方法嗎?

回答

1

您可以使用以下功能:

import org.apache.spark.sql.functions._ 

def diff(key: String, df1: DataFrame, df2: DataFrame): DataFrame = { 
    val fields = df1.schema.fields.map(_.name) 
    val diffColumnName = "Diff" 

    df1 
    .join(df2, df1(key) === df2(key), "full_outer") 
    .withColumn(
     diffColumnName, 
     when(df1(key).isNull, "New row in DataFrame 2") 
     .otherwise(
      when(df2(key).isNull, "New row in DataFrame 1") 
      .otherwise(
       concat_ws("", 
       fields.map(f => when(df1(f) =!= df2(f), s"$f ").otherwise("")):_* 
      ) 
      ) 
     ) 
    ) 
    .filter(col(diffColumnName) =!= "") 
    .select(
     fields.map(f => 
     when(df1(key).isNotNull, df1(f)).otherwise(df2(f)).alias(f) 
    ) :+ col(diffColumnName):_* 
    ) 
} 

你的情況,運行此:

diff("emp_id", df1, df2) 

import org.apache.spark.sql.{DataFrame, SparkSession} 
import org.apache.spark.sql.functions._ 

object DiffDataFrames extends App { 
    val session = SparkSession.builder().master("local").getOrCreate() 

    import session.implicits._ 

    val df1 = session.createDataset(Seq((1,"a",11),(2,"b",2),(3,"c",33),(5,"e",5))).toDF("n", "s", "i") 
    val df2 = session.createDataset(Seq((1,"a",11),(2,"bb",2),(3,"cc",34),(4,"d",4))).toDF("n", "s", "i") 

    def diff(key: String, df1: DataFrame, df2: DataFrame): DataFrame = 
    /* above definition */ 

    diff("n", df1, df2).show(false) 
} 
+0

您能否讓我知道如何聲明df1和df2。我已經聲明如下 sqlContext = SQLContext(sc) df = sqlContext.sql(「select * from table1」) df2 = sqlContext.sql(「select * from table2」)then coped the above code as is .. ..獲取語法錯誤.... IAM非常新的火花斯卡拉代碼 –

+0

你能糾正我我做錯了什麼,當我嘗試運行下面的代碼時我得到一個錯誤:未找到:值df1,未找到df2 .. 進口org.apache.spark.sql {數據幀,sQLContext} 進口org.apache.spark.sql.functions._ VAL SC:SparkContext VAL sqlContext =新org.apache.spark.sql.SQLContext (sc) sqlContext = SQLContext(sc) df1 = sqlContext.sql(「select * from表1 「) DF2 = sqlContext.sql(」 從表2中選擇* 「) DIFF(」 租戶」,DF1,DF2) DEF的diff(鍵:字符串,DF1:數據幀,DF2:數據幀):數據幀= { ......} ///提供有趣的代碼 –

+0

嗨,我添加了一個簡短的例子。 –

0

下面是獲得罕見行的方式在兩個數據幀之間:

val d1 = Seq((3, "Chennai", "rahman", "9848022330", 45000, "SanRamon"), (1, "Hyderabad", "ram", "9848022338", 50000, "SF"), (2, "Hyderabad", "robin", "9848022339", 40000, "LA"), (4, "sanjose", "romin", "9848022331", 45123, "SanRamon")) 
val d2 = Seq((3, "Chennai", "rahman", "9848022330", 45000, "SanRamon"), (1, "Hyderabad", "ram", "9848022338", 50000, "SF"), (2, "Hyderabad", "robin", "9848022339", 40000, "LA"), (4, "sanjose", "romin", "9848022331", 45123, "SanRamon"), (4, "sanjose", "romino", "9848022331", 45123, "SanRamon"), (5, "LA", "Test", "1234567890", 12345, "Testuser")) 

val df1 = d1.toDF("emp_id" ,"emp_city" ,"emp_name" ,"emp_phone" ,"emp_sal" ,"emp_site") 
val df2 = d2.toDF("emp_id" ,"emp_city" ,"emp_name" ,"emp_phone" ,"emp_sal" ,"emp_site") 

spark.sql("((select * from df1) union (select * from df2)) minus ((select * from df1) intersect (select * from df2))").show //spark is SparkSession