2017-03-01 52 views
0

我從SQL數據框中添加:如何將新列到火花數據幀從函數值(使用PySpark)

log = hc.sql("""select 
        , ip 
        , url 
        , ymd 
        from log """) 

和功能,從數據幀適用「IP」的價值和返回三個值:

def get_loc(ip): 
geodata = GeoLocator('SxGeoCity.dat', MODE_BATCH | MODE_MEMORY) 
result = [] 

location = geodata.get_location(ip, detailed=True) 
city_name_en = str(processValue(location['info']['city']['name_en'])) 
region_name_en = str(processValue(location['info']['region']['name_en'])) 
country_name_en = str(processValue(location['info']['country']['name_en'])) 

result = [city_name_en, region_name_en, country_name_en] 

return result 

我不知道如何通過價值的功能get_loc(),並添加返回值作爲地圖欄「屬性」,以現有的數據幀。使用python 2.7和PySpark。

回答

0

我不知道get_loc做了什麼。

但是你可以使用UDF如下:

from pyspark.sql import functions as f 

def get_loc(ip): 
    return str(ip).split('.') 

rdd = spark.sparkContext.parallelize([(1, '192.168.0.1'), (2, '192.168.0.1')]) 
df = spark.createDataFrame(rdd, schema=['idx', 'ip']) 
My_UDF = f.UserDefinedFunction(get_loc, returnType=ArrayType(StringType())) 
df = df.withColumn('loc', My_UDF(df['ip'])) 
df.show() 

# output: 
+---+-----------+----------------+ 
|idx|   ip|    loc| 
+---+-----------+----------------+ 
| 1|192.168.0.1|[192, 168, 0, 1]| 
| 2|192.168.0.1|[192, 168, 0, 1]| 
+---+-----------+----------------+ 
+0

謝謝!但我有一個錯誤:** java.lang.AssertionError:斷言失敗:無法評估PythonUDF。缺少輸入屬性**我有一個問題將值傳遞給get_loc()。 – Fred

+0

我更新了代碼 –

+0

非常感謝!我已經將火花的版本從1.3.0更改爲1.5.1,並且所有的作品。 – Fred