2017-05-07 47 views
2

我正在與pyspark一起使用與本文結尾處顯示的模式相稱的模式(注意嵌套列表,無序的字段),最初從Parquet作爲DataFrame導入。基本上,我遇到的問題是無法將此數據作爲RDD處理,然後轉換回DataFrame。 (我已經審查了若干相關的帖子,但我現在還不能告訴我要去的地方錯了)Pyspark RDD與強制架構的數據框:值錯誤

中平凡,下面的代碼工作正常(如人們所期望的):

schema = deepcopy(tripDF.schema) 
tripRDD = tripDF.rdd 
tripDFNew = sqlContext.createDataFrame(tripRDD, schema) 
tripDFNew.take(1) 

事情不工作時我需要映射RDD(例如添加一個字段的情況)。

schema = deepcopy(tripDF.schema) 
tripRDD = tripDF.rdd 
def trivial_map(row): 
    rowDict = row.asDict() 
    return pyspark.Row(**rowDict) 
tripRDDNew = tripRDD.map(lambda row: trivial_map(row)) 
tripDFNew = sqlContext.createDataFrame(tripRDDNew, schema) 
tripDFNew.take(1) 

上面的代碼給出以下例外其中XXX是一個獨立的爲一個整數,從運行改變爲運行(例如,我已經看到1,16,23,等等):

File "/opt/cloudera/parcels/CDH-5.8.3- 
1.cdh5.8.3.p1967.2057/lib/spark/python/pyspark/sql/types.py", line 546, in 
toInternal 
raise ValueError("Unexpected tuple %r with StructType" % obj) 
ValueError: Unexpected tuple XXX with StructType` 

鑑於此信息,是否在第二個代碼塊有明顯錯誤? (我注意到tripRDD屬於rdd.RDD類,而tripRDDw屬於rdd.PipelinedRDD類,但我認爲這不應該成爲問題。)(我還注意到tripRDD的模式不是按字段名排序的,而對於tripRDDNew架構由場名稱排序再說一遍,我不明白爲什麼這會是一個問題)

架構:。

root 
|-- foo: struct (nullable = true) 
| |-- bar_1: integer (nullable = true) 
| |-- bar_2: integer (nullable = true) 
| |-- bar_3: array (nullable = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- baz_1: integer (nullable = true) 
| | | |-- baz_2: string (nullable = true) 
| | | |-- baz_3: double (nullable = true) 
| |-- bar_4: array (nullable = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- baz_1: integer (nullable = true) 
| | | |-- baz_2: string (nullable = true) 
| | | |-- baz_3: double (nullable = true) 
|-- qux: integer (nullable = true) 
|-- corge: integer (nullable = true) 
|-- uier: integer (nullable = true)` 

回答

2

正如在文章中指出,原來的模式具有字段不按字母順序排列。這就是問題所在。在映射函數中使用.asDict()會對生成的RDD的字段進行排序。 tripRDDNew的字段順序在調用createDataFrame時與模式衝突。 ValueError的結果是試圖將其中一個整型字段(即示例中的qux,corge或uier)解析爲StructType。 (另外:createDataFrame要求模式字段的順序與RDD字段的順序相同,你應該需要一致的字段名稱或者一致的字段順序,但是這兩者似乎都是過度的)。 (作爲第二個方面:DataFrame中非字母字段的存在有些不正常,例如,sc.parallelize()會在分配數據結構時按字母順序排序字段,看起來像字段應該是在從鑲木地板文件中導入數據幀時進行排序。調查爲什麼不是這種情況可能很有趣)。