2017-06-01 172 views
0

我有一個元組的RDD,其中前兩行是這樣的:Pyspark轉換RDD元組到數據幀

[[('n', 12.012457082117459), ('s', 0.79112758892014912)], 
[('t', 3.6243409329763652),('vn', 3.6243409329763652),('n', 52.743253562212828),('v', 11.644347760553064)]] 

在每個元組中,第一值,例如:「N」,「S」, 't'是所需的列名稱,第二個值,例如:12.012,0.7911 ....是每列所需的值。但是,在rdd的每個列表(行)中,我們可以看到並非所有列名都存在。例如,在第一行中,只有

'n', 's' 

出現,雖然沒有

's' 
第二排

。所以我想將這個rdd轉換爲一個數據框,其中值不應該顯示在原始元組中的值應該爲0。換句話說,前兩行可能是這樣的:

n  s  t  vn  omitted..... 
12 0.79 0  0  ..... 
52 0  3.62 3.62 ....... 

我嘗試以下操作:

row = Row('l','eng','q','g','j','b','nt','z','n','d','f','i','k','s','vn','nz','v','nrt','tg','nrfg','t','ng','zg','a') 
df = tup_sum_data.map(row).toDF() 

凡行字符串()是我想要的列名。但我得到以下錯誤:

TypeError         Traceback (most recent call last) 
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj) 
968   try: 
--> 969    return _infer_schema(obj) 
970   except TypeError: 

/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_schema(row) 
991  else: 
--> 992   raise TypeError("Can not infer schema for type: %s" % type(row)) 
993 

TypeError: Can not infer schema for type: <class 'numpy.float64'> 
During handling of the above exception, another exception occurred: 
TypeError         Traceback (most recent call last) 
/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj) 
968   try: 
--> 969    return _infer_schema(obj) 
970   except TypeError: 

/Users/1/Documents/spark/python/pyspark/sql/types.py in _infer_type(obj) 
969    return _infer_schema(obj) 
970   except TypeError: 
--> 971    raise TypeError("not supported type: %s" % type(obj)) 
972 
973 

TypeError: not supported type: <class 'tuple'> 

錯誤代碼中的某些行被省略。任何人都可以幫我弄清楚如何處理這個問題?謝謝 !

UPDATE 我將數據類型從np.float64轉換爲float,並且沒有錯誤。但是,數據框看起來並不像我想要的那樣;它看起來是這樣的:

+--------------------+ 
|     l| 
+--------------------+ 
|[[n,12.0124570821...| 
|[[t,3.62434093297...| 
|[[a,0.44628710262...| 
|[[n,16.7534769832...| 
|[[n,17.6017774340...| 
+--------------------+ 
only showing top 5 rows 

因此,誰能幫助我如何得到正確格式的數據幀?謝謝 !

回答

1
from pyspark.sql.types import * 
from pyspark.sql import * 

data_frame_schema = StructType([ 
    StructField("n", FloatType()), 
    StructField("s", FloatType()), 
    StructField("t", FloatType()), 
    StructField("v", FloatType()), 
    StructField("vn", FloatType()) 
]) 

raw_list = [[('n', 12.012457082117459), ('s', 0.79112758892014912)], \ 
[('t', 3.6243409329763652),('vn', 3.6243409329763652),('n', 52.743253562212828),('v', 11.644347760553064)]] 

raw_rdd = sc.parallelize(raw_list) 

# dict_to_row = lambda d: Row(n=d.get("n"), s=d.get("s"), t=d.get("t"), v=d.get("v"), vn=d.get("vn")) 
dict_to_row = lambda d: Row(n=d.get("n", 0.0), s=d.get("s", 0.0), t=d.get("t", 0.0), v=d.get("v", 0.0), vn=d.get("vn", 0.0)) 

row_rdd = raw_rdd.map(lambda l: dict_to_row(dict(l))) 
df = spark.createDataFrame(row_rdd, data_frame_schema) 
df.show() 

粘貼到上述的pyspark殼產量輸出:

+---------+----------+--------+---------+--------+ 
|  n|   s|  t|  v|  vn| 
+---------+----------+--------+---------+--------+ 
|12.012457|0.79112756|  0.0|  0.0|  0.0| 
| 52.74325|  0.0|3.624341|11.644348|3.624341| 
+---------+----------+--------+---------+--------+ 
+0

這個工作!非常感謝。順便說一下,有沒有方法將null轉換爲0? – Parker

+0

剛剛更新的答案將缺失值用於0.0。那樣有用嗎? – clay

+0

工作很好!謝謝 – Parker