2017-04-03 44 views
1

以下是我正在嘗試的操作。我想在兩個不同的數據框中對兩列中的每個條目進行比較。的dataframes如下所示:如何在PySpark 1.6.1中將第二個數據幀的列傳遞給UDF

>>> subject_df.show() 
+------+-------------+ 
|USERID|  FULLNAME| 
+------+-------------+ 
| 12345| steve james| 
| 12346| steven smith| 
| 43212|bill dunnigan| 
+------+-------------+ 

>>> target_df.show() 
+------+-------------+ 
|USERID|  FULLNAME| 
+------+-------------+ 
|111123| steve tyler| 
|422226| linda smith| 
|123333|bill dunnigan| 
| 56453| steve smith| 
+------+-------------+ 

這裏是我嘗試使用的邏輯:

# CREATE FUNCTION  
def string_match(subject, targets): 
    for target in targets: 
     <logic> 
    return logic_result 

# CREATE UDF 
string_match_udf = udf(string_match, IntegerType()) 

# APPLY UDF 
subject_df.select(subject_df.FULLNAME, string_match_udf(subject_df.FULLNAME, target_df.FULLNAME).alias("score")) 

這在pyspark殼運行代碼時出現錯誤:

py4j.protocol.Py4JJavaError: An error occurred while calling o45.select. 
: java.lang.RuntimeException: Invalid PythonUDF PythonUDF#string_match(FULLNAME#2,FULLNAME#5), requires attributes from more than one child. 

我認爲我的問題的根源是試圖將第二列傳遞給函數。我應該使用RDD嗎?請記住,實際的subject_df和target_df都是超過100,000行。我願意接受任何建議。

回答

3

它看起來像你有一個錯誤的想法用戶自定義函數是如何工作的:

  • 功能從只有一行的時候
  • 您不能從無關DataFame使用數據接收值。

做你想做的事情的唯一方法就是採用笛卡爾產品。

subject_df.join(target_df).select(
f(subject_df.FULLNAME, target_df.FULLNAME) 
) 

其中f是當時比較兩個元素的函數。

+0

任何關於創建100,000,000,000行長的笛卡爾產品的擔憂? – datanerdjake

相關問題