2017-09-03 90 views
0

我想將一列的值與具有參考值範圍的另一列進行比較。pyspark比較列值與另一列包含值範圍

我曾嘗試使用下面的代碼嘗試:

from pyspark.sql.functions import udf, size 
from pyspark.sql.types import * 
df1 = sc.parallelize([([1], [1, 2, 3]), ([2], [4, 5, 6,7])]).toDF(["value", "Reference_value"]) 
intersect = lambda type: (udf(
    lambda x, y: (
     list(set(x) & set(y)) if x is not None and y is not None else None), 
    ArrayType(type))) 
integer_intersect = intersect(IntegerType()) 

# df1.select(
#  integer_intersect("value", "Reference_value"), 
#  size(integer_intersect("value", "Reference_value"))).show() 
df1=df1.where(size(integer_intersect("value", "Reference_value")) > 0) 
df1.show() 

上面的代碼工作,如果我們像下面創建數據框:

,因爲價值和refernce_value列ARRAY_TYPE與long_type 但如果我讀數據框與csv然後我無法轉換爲數組類型。這裏DF1從CSV

df1 is as follows df1= 

category value Reference value 

count   1  1 
n_timer  n20  n40,n20 
frames   54  56 
timer   n8  n3,n6,n7 
pdf   FALSE  TRUE 
zip   FALSE  FALSE 

我想用「Reference_value」列比較「值」列,並推導出兩個新的dataframes其中一個數據幀是過濾行,如果值列不在設定的基準讀取值。

輸出DF2 =

category  value  Reference value 

    count   1  1 
    n_timer  n20  n40,n20 
    zip   FALSE  FALSE 

輸出DF3 =

category value Reference value 

frames   54  56 
timer   n8  n3,n6,n7 
pdf   FALSE  TRUE 

是有像array_contains任何更簡單的方法。我嘗試過Array_contains,但不工作

from pyspark.sql.functions import array_contains 
df.where(array_contains("Reference_value", df1["vale"])) 

回答

-2
#One can copy paste the below code for direct input and outputs 

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
from pyspark.sql import Row 
from pyspark.sql.functions import udf, size 
from pyspark.sql.types import * 
from pyspark.sql.functions import split 
sc = SparkContext.getOrCreate() 
sqlContext = SQLContext.getOrCreate(sc) 
df1 = sc.parallelize([("count","1","1"), ("n_timer","n20","n40,n20"), ("frames","54","56"),("timer","n8","n3,n6,n7"),("pdf","FALSE","TRUE"),("zip","FALSE","FALSE")]).toDF(["category", "value","Reference_value"]) 
print(df1.show()) 
df1=df1.withColumn("Reference_value", split("Reference_value", ",\s*").cast("array<string>")) 
df1=df1.withColumn("value", split("value", ",\s*").cast("array<string>")) 
intersect = lambda type: (udf(
    lambda x, y: (
     list(set(x) & set(y)) if x is not None and y is not None else None), 
    ArrayType(type))) 
string_intersect = intersect(StringType()) 
df2=df1.where(size(string_intersect("value", "Reference_value")) > 0) 
df3=df1.where(size(string_intersect("value", "Reference_value")) <= 0) 
print(df2.show()) 
print(df3.show()) 

input df1= 
+--------+-----+---------------+ 
|category|value|Reference_value| 
+--------+-----+---------------+ 
| count| 1|    1| 
| n_timer| n20|  n40,n20| 
| frames| 54|    56| 
| timer| n8|  n3,n6,n7| 
|  pdf|FALSE|   TRUE| 
|  zip|FALSE|   FALSE| 
+--------+-----+---------------+ 

df2= 
+--------+-------+---------------+ 
|category| value|Reference_value| 
+--------+-------+---------------+ 
| count| [1]|   [1]| 
| n_timer| [n20]|  [n40, n20]| 
|  zip|[FALSE]|  [FALSE]| 
+--------+-------+---------------+ 

df3= 
+--------+-------+---------------+ 
|category| value|Reference_value| 
+--------+-------+---------------+ 
| frames| [54]|   [56]| 
| timer| [n8]| [n3, n6, n7]| 
|  pdf|[FALSE]|   [TRUE]| 
+--------+-------+---------------+ 
相關問題