2016-05-14 32 views
0

我使用帶PySpark的Jupyter Notebook。在那裏我有一個數據框架,這些數據架構有一個列名和類型(整數,...)的列。現在我使用flatMap這樣的方法,但是這會返回一個沒有固定類型的元組列表。有沒有辦法實現這一點?PySpark平面圖應該返回帶有類型值的元組

df.printSchema() 
root 
|-- name: string (nullable = true) 
|-- ... 
|-- ... 
|-- ratings: integer (nullable = true) 

然後我用flatMap做的額定值一些計算(這裏混淆):

df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)) 
y_rate.toDF().printSchema() 

現在我得到一個錯誤:

TypeError: Can not infer schema for type:

有什麼辦法通過保持模式使用map/flatMap/reduce?或者至少返回具有特定類型值的元組?

回答

1

首先,您使用的是錯誤的功能。 flatMapmapflatten所以假設你的數據是這樣的:

df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"]) 

flatMap的輸出將等同於:

sc.parallelize(['foo', 0, 'bar', 5]) 

因此,你看到的錯誤。如果你真的想使它工作,你應該使用map

df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF() 
## DataFrame[_1: string, _2: bigint] 

接着,在DataFrame映射不再2.0支持。您應首先提取rdd(請參閱上面的df.rdd.map)。

最後在Python和JVM之間傳遞數據效率極低。它不僅需要在Python和JVM之間傳遞數據以及相應的序列化/反序列化和模式推理(如果沒有明確提供模式),這也會打破懶惰。這是更好地使用SQL表達式這樣的事情:

from pyspark.sql.functions import when 

df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings)) 

如果由於某種原因,你需要普通的Python代碼的UDF可能是一個更好的選擇。

+0

非常有幫助。感謝您的示例代碼。我只是沒有得到flatMap vs Map的部分。 – Matthias

+1

'flatMap'是一個函數'RDD [T] =>(T => Iterable [U])=> RDD [U]'。換句話說,它期望函數返回'Itereble'(Python元組),並連接這些(變平)結果。 – zero323

+0

有沒有辦法在該聲明中給出when/otherwise列的名稱?請參閱'df.select(df.id,when(df.ratings> 5,5).otherwise(df.ratings))'@ zero323 – Matthias

相關問題