我正在與pyspark一起使用與本文結尾處顯示的模式相稱的模式(注意嵌套列表,無序的字段),最初從Parquet作爲DataFrame導入。基本上,我遇到的問題是無法將此數據作爲RDD處理,然後轉換回DataFrame。 (我已經審查了若干相關的帖子,但我現在還不能告訴我要去的地方錯了)Pyspark RDD與強制架構的數據框:值錯誤
中平凡,下面的代碼工作正常(如人們所期望的):
schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
tripDFNew = sqlContext.createDataFrame(tripRDD, schema)
tripDFNew.take(1)
事情不工作時我需要映射RDD(例如添加一個字段的情況)。
schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
def trivial_map(row):
rowDict = row.asDict()
return pyspark.Row(**rowDict)
tripRDDNew = tripRDD.map(lambda row: trivial_map(row))
tripDFNew = sqlContext.createDataFrame(tripRDDNew, schema)
tripDFNew.take(1)
上面的代碼給出以下例外其中XXX是一個獨立的爲一個整數,從運行改變爲運行(例如,我已經看到1,16,23,等等):
File "/opt/cloudera/parcels/CDH-5.8.3-
1.cdh5.8.3.p1967.2057/lib/spark/python/pyspark/sql/types.py", line 546, in
toInternal
raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple XXX with StructType`
鑑於此信息,是否在第二個代碼塊有明顯錯誤? (我注意到tripRDD屬於rdd.RDD類,而tripRDDw屬於rdd.PipelinedRDD類,但我認爲這不應該成爲問題。)(我還注意到tripRDD的模式不是按字段名排序的,而對於tripRDDNew架構由場名稱排序再說一遍,我不明白爲什麼這會是一個問題)
架構:。
root
|-- foo: struct (nullable = true)
| |-- bar_1: integer (nullable = true)
| |-- bar_2: integer (nullable = true)
| |-- bar_3: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- baz_1: integer (nullable = true)
| | | |-- baz_2: string (nullable = true)
| | | |-- baz_3: double (nullable = true)
| |-- bar_4: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- baz_1: integer (nullable = true)
| | | |-- baz_2: string (nullable = true)
| | | |-- baz_3: double (nullable = true)
|-- qux: integer (nullable = true)
|-- corge: integer (nullable = true)
|-- uier: integer (nullable = true)`