0
我想將一列的值與具有參考值範圍的另一列進行比較。pyspark比較列值與另一列包含值範圍
我曾嘗試使用下面的代碼嘗試:
from pyspark.sql.functions import udf, size
from pyspark.sql.types import *
df1 = sc.parallelize([([1], [1, 2, 3]), ([2], [4, 5, 6,7])]).toDF(["value", "Reference_value"])
intersect = lambda type: (udf(
lambda x, y: (
list(set(x) & set(y)) if x is not None and y is not None else None),
ArrayType(type)))
integer_intersect = intersect(IntegerType())
# df1.select(
# integer_intersect("value", "Reference_value"),
# size(integer_intersect("value", "Reference_value"))).show()
df1=df1.where(size(integer_intersect("value", "Reference_value")) > 0)
df1.show()
上面的代碼工作,如果我們像下面創建數據框:
,因爲價值和refernce_value列ARRAY_TYPE與long_type 但如果我讀數據框與csv然後我無法轉換爲數組類型。這裏DF1從CSV
df1 is as follows df1=
category value Reference value
count 1 1
n_timer n20 n40,n20
frames 54 56
timer n8 n3,n6,n7
pdf FALSE TRUE
zip FALSE FALSE
我想用「Reference_value」列比較「值」列,並推導出兩個新的dataframes其中一個數據幀是過濾行,如果值列不在設定的基準讀取值。
輸出DF2 =
category value Reference value
count 1 1
n_timer n20 n40,n20
zip FALSE FALSE
輸出DF3 =
category value Reference value
frames 54 56
timer n8 n3,n6,n7
pdf FALSE TRUE
是有像array_contains任何更簡單的方法。我嘗試過Array_contains,但不工作
from pyspark.sql.functions import array_contains
df.where(array_contains("Reference_value", df1["vale"]))