2017-01-18 161 views
16

我想在Apache Spark連接中包含空值。默認情況下,Spark不包含空行。在Apache Spark連接中包含空值

這是默認的Spark行爲。

val numbersDf = Seq(
    ("123"), 
    ("456"), 
    (null), 
    ("") 
).toDF("numbers") 

val lettersDf = Seq(
    ("123", "abc"), 
    ("456", "def"), 
    (null, "zzz"), 
    ("", "hhh") 
).toDF("numbers", "letters") 

val joinedDf = numbersDf.join(lettersDf, Seq("numbers")) 

這裏是joinedDf.show()輸出:

+-------+-------+ 
|numbers|letters| 
+-------+-------+ 
| 123| abc| 
| 456| def| 
|  | hhh| 
+-------+-------+ 

這是輸出,我想:

+-------+-------+ 
|numbers|letters| 
+-------+-------+ 
| 123| abc| 
| 456| def| 
|  | hhh| 
| null| zzz| 
+-------+-------+ 

回答

26

斯卡拉提供了一個特殊NULL安全平等的運營商:

numbersDf 
    .join(lettersDf, numbersDf("numbers") <=> lettersDf("numbers")) 
    .drop(lettersDf("numbers")) 
+-------+-------+ 
|numbers|letters| 
+-------+-------+ 
| 123| abc| 
| 456| def| 
| null| zzz| 
|  | hhh| 
+-------+-------+ 

小心不要在Spark 1.5或更低版本中使用它。在Spark 1.6之前,它需要一個笛卡爾產品(SPARK-11111-快速無效安全連接)。在SparkR

numbers_df = sc.parallelize([ 
    ("123",), ("456",), (None,), ("",) 
]).toDF(["numbers"]) 

letters_df = sc.parallelize([ 
    ("123", "abc"), ("456", "def"), (None, "zzz"), ("", "hhh") 
]).toDF(["numbers", "letters"]) 

numbers_df.join(letters_df, numbers_df.numbers.eqNullSafe(letters_df.numbers)) 

%<=>%

星火2.3.0或更高版本,你可以在PySpark使用Column.eqNullSafe

numbers_df <- createDataFrame(data.frame(numbers = c("123", "456", NA, ""))) 
letters_df <- createDataFrame(data.frame(
    numbers = c("123", "456", NA, ""), 
    letters = c("abc", "def", "zzz", "hhh") 
)) 

head(join(numbers_df, letters_df, numbers_df$numbers %<=>% letters_df$numbers)) 
numbers numbers letters 
1  456  456  def 
2 <NA> <NA>  zzz 
3      hhh 
4  123  123  abc 

隨着SQL星火2.2.0+),可以使用IS NOT DISTINCT FROM

numbersDf.alias("numbers") 
    .join(lettersDf.alias("letters")) 
    .where("numbers.numbers IS NOT DISTINCT FROM letters.numbers") 
+2

謝謝。 [這是另一個很好的答案](http://stackoverflow.com/questions/31240148/spark-specify-multiple-column-conditions-for-dataframe-join),它使用'<=>'操作符。如果您正在進行多列連接,則可以使用'&&'運算符鏈接條件。 – Powers

5
val numbers2 = numbersDf.withColumnRenamed("numbers","num1") //rename columns so that we can disambiguate them in the join 
val letters2 = lettersDf.withColumnRenamed("numbers","num2") 
val joinedDf = numbers2.join(letters2, $"num1" === $"num2" || ($"num1".isNull && $"num2".isNull) ,"outer") 
joinedDf.select("num1","letters").withColumnRenamed("num1","numbers").show //rename the columns back to the original names 

SELECT * FROM numbers JOIN letters 
ON numbers.numbers IS NOT DISTINCT FROM letters.numbers 

這可以用DataFrame API以及使用

相關問題