2017-04-25 49 views
0

我已經在此處看到類似於我的問題,但是在嘗試一些可接受的答案時,我的代碼仍然出現錯誤。我有一個包含三列的數據框 - 創建_at,文本和單詞(這只是文本的標記版本)。請看下圖:如果文本列包含指定列表中的單詞,則過濾pyspark數據框

enter image description here

現在,我公司['Starbucks', 'Nvidia', 'IBM', 'Dell']的名單,我只是想保持在文本包括上述的那些話行。

我已經嘗試了一些東西,但沒有成功:

small_DF.filter(lambda x: any(word in x.text for word in test_list)) 

返回:類型錯誤:條件應該是字符串或列

我試圖創建一個函數,並使用foreach()

def filters(line): 
    return(any(word in line for word in test_list)) 
df = df.foreach(filters) 

將df變成'Nonetype'

而最後一個我想:

df = df.filter((col("text").isin(test_list)) 

這將返回一個空的數據幀,因爲我沒有得到任何錯誤,這是很好的,但顯然不是我想要的。

回答

0

我認爲filter is not working becuase it expect a boolean output from lambda function and isin just with column。您正試圖將單詞列表與單詞列表進行比較。這裏是什麼,我想可以給你一些方向 -

# prepare some test data ==> 

words = [x.lower() for x in ['starbucks', 'Nvidia', 'IBM', 'Dell']] 
data = [['i love Starbucks'],['dell laptops rocks'],['help me I am stuck!']] 
df = spark.createDataFrame(data).toDF('text') 


from pyspark.sql.types import * 

def intersect(row): 
    # convert each word in lowecase 
    row = [x.lower() for x in row.split()] 
    return True if set(row).intersection(set(words)) else False 


filterUDF = udf(intersect,BooleanType()) 
df.where(filterUDF(df.text)).show() 

輸出:

+------------------+ 
|    text| 
+------------------+ 
| i love Starbucks| 
|dell laptops rocks| 
+------------------+ 
+0

我試過了你在我的數據框中寫的UDF,用我的數據替換了df.where和df.text;但是,我收到錯誤:AttributeError:'NoneType'對象沒有'split'屬性。 對於交集函數,從技術上說,你傳入一列(df.text)作爲參數,對嗎?是錯誤,因爲它不是逐行迭代? – sjc725

+0

你的列文本是否有空值,可以解釋爲NoneType錯誤。你將不得不在udf中處理這個「if在None中返回False」。對於相交的行值,即「我愛星巴克」。如果你發佈你已經嘗試過的樣本數據,這可能會有幫助。數據的圖像不是很有幫助。 – Pushkr

0

上dataframes你.filter返回一個錯誤,因爲它是SQL過濾功能(期待BooleanType()列)不RDD上的過濾功能。您列"text"

small_DF.rdd.filter(lambda x: any(word in x.text for word in test_list)) 

您不必使用UDF,你可以在pyspark使用正則表達式.rlike

from pyspark.sql import HiveContext 
hc = HiveContext(sc) 
import pyspark.sql.functions as psf 

words = [x.lower() for x in ['starbucks', 'Nvidia', 'IBM', 'Dell']] 
data = [['i love Starbucks'],['dell laptops rocks'],['help me I am stuck!']] 
df = hc.createDataFrame(data).toDF('text') 
df.filter(psf.lower(df.text).rlike('|'.join(words))) 
如果你想使用RDD一個,只需添加 .rdd
相關問題