0
嗨我想寫一個代碼在pyspark從數據框中創建一個列表。我在我的代碼中使用collect()函數,但不確定它是否是從數據幀列中獲取值的過濾器列表的正確方法。由於collect()將數據帶入數據節點,所以在大尺寸數據框例如10 GB的情況下它將是一個糟糕的選項。避免在pyspark代碼中使用collect()函數的最佳方法是什麼?編寫優化pyspark代碼的最佳方法是什麼?
下面是我的輸入數據框 -
[Row(parent=u'p1', child=u'c1'), Row(parent=u'p11', child=u'p1'),
Row(parent=u'p111', child=u'p11'), Row(parent=u'p2', child=u'c2'),
Row(parent=u'p22', child=u'p2'), Row(parent=u'p222', child=u'p22'),
Row(parent=u'p2222', child=u'p222')]
我想實現如下輸出數據框 -
[Row(parent=u'p2222', child1=u'p222', child2=u'p22', child3=u'p2',
child4=u'c2'), Row(parent=u'p111', child1=u'p11', child2=u'p1',
child3=u'c1', child4=None)]
下面是我寫的工作代碼,但不知道這是否火花優化以優化處理而聞名
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
customSchema = StructType([StructField('parent',StringType(),True),\
StructField('child',StringType(),True)])
#loading data from a CSV file and creating a dataframe
mydata = sqlContext.load(source='com.databricks.spark.csv',path='/FileStore/tables/34v0qouq1507635707462/parent_child_input.csv',header=True,schema=customSchema)
mydata.registerTempTable('mydata')
#creating a list of values of column "Child" from the dataframe "mydata"
childlist = [x[1] for x in mydata.collect()]
#creating another dataframe with filter values of "Parent" column which are not present in childlist
level1 = mydata.selectExpr('parent','child as child1').where(~mydata.parent.isin(childlist))
i=1
#Function to create dataframe containing desired output as mentioned above
def getChild(level1,i):
cname = 'child'+str(i)
tmp = [x[i] for x in level1.collect() if x[i]]
tmp = list(set(tmp))
if tmp.count(None)==1:
tmp.remove(None)
level1.registerTempTable('level1')
if len(tmp)>0:
i+=1
ccname = 'child'+str(i)
querystr='select level1.*,mydata.child as ' +ccname+\
' from level1 left outer join mydata on level1.'+cname+'=mydata.parent'
level1 = sqlContext.sql(querystr)
level1 = getChild(level1,i)
return level1
level1 = getChild(level1,i)
level1.drop('child5').show()