2016-04-12 94 views
3

我要執行這兩個PySpark DataFrames之間的連接:加入PySpark DataFrames嵌套場

from pyspark import SparkContext 
from pyspark.sql.functions import col 

sc = SparkContext() 

df1 = sc.parallelize([ 
    ['owner1', 'obj1', 0.5], 
    ['owner1', 'obj1', 0.2], 
    ['owner2', 'obj2', 0.1] 
]).toDF(('owner', 'object', 'score')) 

df2 = sc.parallelize(
      [Row(owner=u'owner1', 
      objects=[Row(name=u'obj1', value=Row(fav=True, ratio=0.3))])]).toDF() 

的加入必須在對象,即場內的名稱進行對象對於df2和對象對於df1。

我能夠在嵌套場執行SELECT,如

df2.where(df2.owner == 'owner1').select(col("objects.value.ratio")).show() 

,但我不能夠運行這個連接:

df2.alias('u').join(df1.alias('s'), col('u.objects.name') == col('s.object')) 

返回錯誤

pyspark.sql.utils.AnalysisException:由於數據類型,u「無法解析 '(objects.name = cast(object as double))' '(objects.name = cast(object as double))'(array and double);「

任何想法如何解決這個問題?

回答

5

既然你想匹配和提取特定元素的最簡單的方法是explode行:

matches = df2.withColumn("object", explode(col("objects"))).alias("u").join(
    df1.alias("s"), 
    col("s.object") == col("u.object.name") 
) 

matches.show() 
## +-------------------+------+-----------------+------+------+-----+ 
## |   objects| owner|   object| owner|object|score| 
## +-------------------+------+-----------------+------+------+-----+ 
## |[[obj1,[true,0.3]]]|owner1|[obj1,[true,0.3]]|owner1| obj1| 0.5| 
## |[[obj1,[true,0.3]]]|owner1|[obj1,[true,0.3]]|owner1| obj1| 0.2| 
## +-------------------+------+-----------------+------+------+-----+ 

另類,但非常低效的方法是使用array_contains

matches_contains = df1.alias("s").join(
    df2.alias("u"), expr("array_contains(objects.name, object)")) 

它是無效的因爲它會擴展到笛卡爾產品:

matches_contains.explain() 
## == Physical Plan == 
## Filter array_contains(objects#6.name,object#4) 
## +- CartesianProduct 
## :- Scan ExistingRDD[owner#3,object#4,score#5] 
## +- Scan ExistingRDD[objects#6,owner#7] 

如果陣列的大小相對較小,可以生成array_contains的優化版本,如我在此顯示的那樣:Filter by whether column value equals a list in spark