2016-04-21 61 views
0

我想解析一個SQL查詢,並且想調用一個數據框的每一行的函數。功能爲如下:__getattr__錯誤,同時調用foreach中的數據幀在pyspark

def updateParser(df): 
# update tab1 set value1 = 0.34 where id = 1111 
# identify positions 
setPos = df.select(instr(df.query, ' set ').alias('set')).collect()[0].set 
wherePos = df.select(instr(df.query, ' where ').alias('where')).collect()[0].where 
idPos = df.select(instr(df.query, ' id').alias('id')).collect()[0].id 

# identify table, fields&values, id 
df = df.withColumn('table', upper(trim(df.query.substr(7, setPos - 7)))) 
df = df.withColumn('fieldValueList', upper(trim(df.query.substr(setPos + 5, (wherePos - (setPos + 5) + 1))))) 
df = df.withColumn('id', upper(trim(df.query.substr(idPos + 5, 10)))) 
#identify the column being updated and the value 
df.show(n=5, truncate = False) 

而且我通過調用這個:

updateDF.foreach(updateParser) 

但我得到下面的錯誤:

File "/home/mapr/scripts/cdc.py", line 19, in updateParser 
setPos = df.select(instr(df.query, ' set ').alias('set')).collect()[0].set 
    File "/opt/mapr/spark/spark-1.5.2/python/lib/pyspark.zip/pyspark/sql/types.py", line 1257, in __getattr__ 
raise AttributeError(item) 
AttributeError: select 

我不使用GETATTR任何地方。這是否需要?如果我不使用foreach並直接在數據框上運行它,那麼它運行良好。任何人都可以請指教。

+0

a)這是無效的Python代碼(至少需要修正縮進)b)如果'updateDF'是'DataFrame',則這不是有效的Spark代碼。 – zero323

+0

縮進失去了,因爲它是從vi編輯器複製的,並且代碼在pyspark中運行良好,並且在CLI和pyspark作業中進行了測試。 – learning

回答

0

我發現問題 - 因爲我爲每一行調用一個數據框,所以我無法在每一行上使用df.select。相反,我需要使用Row對象及其方法。這就是屬性錯誤的原因,選擇提供錯誤,因爲它不是一個有效的操作。