2017-05-30 113 views
0

我有一些字典和函數定義:如何在Pyspark中使用具有列功能的數據框中的函數?

dict_TEMPERATURE = {(0, 70): 'Low', (70.01, 73.99): 'Normal-Low',(74, 76): 'Normal', (76.01, 80): 'Normal-High', (80.01, 300): 'High'} 
... 
hierarchy_dict = {'TEMP': dict_TEMPERATURE, 'PRESS': dict_PRESSURE, 'SH_SP': dict_SHAFT_SPEED, 'POI': dict_POI, 'TRIG': dict_TRIGGER} 



def function_definition(valor, atributo): 

    dict_atributo = hierarchy_dict[atributo] 
    valor_generalizado = None 

    if isinstance(valor, (int, long, float, complex)): 

     for key, value in dict_atributo.items(): 

      if(isinstance(key, tuple)): 
       lista = list(key) 

       if (valor > key[0] and valor < key[1]): 
        valor_generalizado = value 

    else: # if it is not numeric 
     valor_generalizado = dict_atributo.get(valor) 


    return valor_generalizado 

什麼這個功能基本上做的是:檢查被作爲參數傳遞給了「function_definition」功能傳遞的價值,並根據代替它的價值它的字典的引用。因此,如果我調用「function_definition(60,'TEMP')」,它將返回'LOW'。

在另一方面,我有下一個結構的數據框(這是一個例子):

+----+-----+-----+---+----+ 
|TEMP|SH_SP|PRESS|POI|TRIG| 
+----+-----+-----+---+----+ 
| 0| 1| 2| 0| 0| 
| 0| 2| 3| 1| 1| 
| 0| 3| 4| 2| 1| 
| 0| 4| 5| 3| 1| 
| 0| 5| 6| 4| 1| 
| 0| 1| 2| 5| 1| 
+----+-----+-----+---+----+ 

我想要做的是替換數據幀中的一列的基礎上,該值功能如上所定義的,所以我有下一個代碼行:

dataframe_new = dataframe.withColumn(atribute_name, function_definition(dataframe[atribute_name], atribute_name)) 

但在執行時,它我獲得下一個錯誤消息:

AssertionError: col should be Column 

我的代碼有什麼問題?怎麼可能做到這一點?

回答

1

function_definition(勇武,atributo)返回一個英勇一個String(valor_generalizado)。

AssertionError:col應爲列意味着您傳遞的參數不是列的WithColumn(colName,col)。 所以你必須改變你的數據,爲了有,例如你可以看到下面。

數據框例如(相同的結構,你的):

a = [(10.0,1.2),(73.0,4.0)] # like your dataframe, this is only an example 

dataframe = spark.createDataFrame(a,["tp", "S"]) # tp and S are random names for these columns 

dataframe.show() 
+----+---+ 
| tp| S| 
+----+---+ 
|10.0|1.2| 
|73.0|4.0| 
+----+---+ 

正如你可以看到here

udf Creates a Column expression representing a user defined function (UDF).

解決方案:

from pyspark.sql.functions import udf 

attr = 'TEMP' 
udf_func = udf(lambda x: function_definition(x,attr),returnType=StringType()) 

dataframe_new = dataframe.withColumn("newCol",udf_func(dataframe.tp)) 
dataframe_new.show() 

+----+---+----------+ 
| tp| S| newCol| 
+----+---+----------+ 
|10.0|1.2|  Low| 
|73.0|4.0|Normal-Low| 
+----+---+----------+ 
+0

謝謝你很多!那就是我一直在尋找的東西! – jartymcfly

相關問題