2

如何在另一個數據幀上執行UDF時引用pyspark數據框?Pyspark:如何在另一個數據框中的UDF中引用數據框?

這是一個虛擬的例子。我創建了兩個數據幀scoreslastnames,並且在每個數據幀中存在兩個數據幀相同的列。在應用於scores的UDF中,我想過濾lastnames並返回在lastname中找到的字符串。

from pyspark import SparkContext 
from pyspark import SparkConf 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 

sc = SparkContext("local") 
sqlCtx = SQLContext(sc) 


# Generate Random Data 
import itertools 
import random 
student_ids = ['student1', 'student2', 'student3'] 
subjects = ['Math', 'Biology', 'Chemistry', 'Physics'] 
random.seed(1) 
data = [] 

for (student_id, subject) in itertools.product(student_ids, subjects): 
    data.append((student_id, subject, random.randint(0, 100))) 

from pyspark.sql.types import StructType, StructField, IntegerType, StringType 
schema = StructType([ 
      StructField("student_id", StringType(), nullable=False), 
      StructField("subject", StringType(), nullable=False), 
      StructField("score", IntegerType(), nullable=False) 
    ]) 

# Create DataFrame 
rdd = sc.parallelize(data) 
scores = sqlCtx.createDataFrame(rdd, schema) 

# create another dataframe 
last_name = ["Granger", "Weasley", "Potter"] 
data2 = [] 
for i in range(len(student_ids)): 
    data2.append((student_ids[i], last_name[i])) 

schema = StructType([ 
      StructField("student_id", StringType(), nullable=False), 
      StructField("last_name", StringType(), nullable=False) 
    ]) 

rdd = sc.parallelize(data2) 
lastnames = sqlCtx.createDataFrame(rdd, schema) 


scores.show() 
lastnames.show() 


from pyspark.sql.functions import udf 
def getLastName(sid): 
    tmp_df = lastnames.filter(lastnames.student_id == sid) 
    return tmp_df.last_name 

getLastName_udf = udf(getLastName, StringType()) 
scores.withColumn("last_name", getLastName_udf("student_id")).show(10) 

而下面是跟蹤的最後一部分:

Py4JError: An error occurred while calling o114.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335) 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344) 
    at py4j.Gateway.invoke(Gateway.java:252) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
+0

您不能在UDF內部訪問'df',因爲它將在執行程序中處理,'df' ref只能從驅動程序訪問。你可以使用廣播變量作爲'lastnames'。讓我知道是否需要任何幫助。 – mrsrinivas

+0

但是考慮將'lastnames'加入'scores'而不是從UDF中加入。 – mrsrinivas

+0

嗨@mrsrinivas,謝謝你的回覆。首先我不能加入,因爲即使這個虛擬示例可以使用連接來解決,在我的實際實現中,我需要在UDF中執行更多的處理。其次,是的!我如何在這種情況下使用廣播變量? – tohweizhong

回答

2

更改配對字典而不是創造rdd並使得人們df的名字

data2 = {} 
for i in range(len(student_ids)): 
    data2[student_ids[i]] = last_name[i] 

容易查找創建廣播變量

//rdd = sc.parallelize(data2) 
//lastnames = sqlCtx.createDataFrame(rdd, schema) 
lastnames = sc.broadcast(data2) 

現在通過廣播變量(lastnames)在udf上使用values attr來訪問。

from pyspark.sql.functions import udf 
def getLastName(sid): 
    return lastnames.value[sid] 
+1

我用**廣播變量**修改了你的實現。儘量讓你的UDF儘可能多的純功能,太多的外部依賴可能會降低性能。 – mrsrinivas

+0

我試過了你的代碼片段 - 當我看到'lastnames.value'時,我得到'[('student1','Granger'),('student2','Weasley'),('student3','Potter') ]',這意味着'lastnames.value.filter'不會工作了嗎?看起來好像是 – tohweizhong

+0

在udf中嘗試'return lastnames.value [「sid」]'並創建一個字典(變量'data2'),其中'sid'作爲鍵和值作爲'lastname'。 – mrsrinivas

2

您不能直接從UDF內引用數據幀(或RDD)。 DataFrame對象是驅動程序上的一個句柄,用於表示將在羣集上發生的數據和操作。在您選擇Spark時,UDF中的代碼將在羣集上運行。 Spark通過對該代碼進行序列化並將閉包中包含的任何變量的副本發送給每個工作人員來完成此操作。

你想做什麼,是使用Spark提供的API來加入/組合兩個DataFrame。如果其中一個數據集很小,則可以手動發送廣播變量中的數據,然後從UDF訪問它。否則,您可以像創建兩個數據框一樣創建兩個數據框,然後使用連接操作來合併它們。像這樣的東西應該工作:

joined = scores.withColumnRenamed("student_id", "join_id") 
joined = joined.join(lastnames, joined.join_id == lastnames.student_id)\ 
       .drop("join_id") 
joined.show() 

+---------+-----+----------+---------+ 
| subject|score|student_id|last_name| 
+---------+-----+----------+---------+ 
|  Math| 13| student1| Granger| 
| Biology| 85| student1| Granger| 
|Chemistry| 77| student1| Granger| 
| Physics| 25| student1| Granger| 
|  Math| 50| student2| Weasley| 
| Biology| 45| student2| Weasley| 
|Chemistry| 65| student2| Weasley| 
| Physics| 79| student2| Weasley| 
|  Math| 9| student3| Potter| 
| Biology| 2| student3| Potter| 
|Chemistry| 84| student3| Potter| 
| Physics| 43| student3| Potter| 
+---------+-----+----------+---------+ 

另外值得一提的,是引擎蓋下星火DataFrames有一個優化,其中一個數據幀是的加入可以被轉換成廣播變量,以避免洗牌如果是部分夠小。因此,如果您執行上面列出的聯接方法,您應該獲得最佳性能,而不會犧牲處理大型數據集的能力。

相關問題