2017-06-11 138 views
1

這個問題出現以及踏過我開始尋求幫助,但我還沒有找到一個解決辦法。事實上,你可能會發現你看到的可能的重複數,但我想我把它們都試過在最後幾個小時。據我所知,sqlContext將在這裏做的伎倆,但我接受任何有效的答案。我使用的Spark 2.1從RDD管道DF Pyspark

我開始與我從MongoDB的向下拉動的ID列表。 輸出示例:

[u'182028', u'161936', u'12333', u'120677'] 
'rated_game_ids_lst type:' <type 'list'> 

我然後繼續前進,試圖創造,我要變成一個DF的RDD:

user_unrated_games = ugr_rdd.filter(lambda x: x[1] not in rated_game_ids_lst).map(lambda x: (19, x[1], x[2])) 

輸出示例:

'user_unrated_games:' [(19, u'174430', 3.4), (19, u'169786', 3.4)] 
'user_unrated_games type:' <class 'pyspark.rdd.PipelinedRDD'> 

和樣本我以上使用urg_rdd(第一行):

'ugr_rdd:'[Row(user_id=5, game_id=u'182028', rating=9.15)] 
'ugr_rdd_type:' pyspark.rdd.RDD 

我再試試這個:

df = sqlContext.createDataFrame(user_unrated_games, ['user_id', 'game_id', 'rating']) 

這種方法失敗,所以我嘗試這樣做:

user_unrated_games = ugr_rdd.filter(lambda x: x[1] not in rated_game_ids_lst).map(lambda x: Row(user_id=19, game_id=x[1], rating= x[2])) 

輸出示例:

('user_unrated_games type:', <class 'pyspark.rdd.PipelinedRDD'>) 
('user_unrated_games:', [Row(game_id=u'174430', rating=3.4, user_id=19), Row(game_id=u'169786', rating=3.4, user_id=19)]) 

,然後這樣的:

df = sqlContext.createDataFrame(user_unrated_games) 

這兩個嘗試分給這個錯誤:

IllegalArgumentException: u"Error while instantiating 'org.apache.spark.sql.hive.HiveSessionState':" 

從那裏,我開始嘗試改變類型的「USER_ID」的組合等,試圖傳遞RDD原樣,試圖在我的管道轉換爲RDD ......坦率地說我嘗試了很多東西,但是上面的兩個看起來最接近似乎爲其他人工作的東西。

回答

1

的問題是,你需要指定架構包括數據類型調用createDataFrame方法時。像這樣的應該做的伎倆:

from pyspark.sql.types import * 

rdd = sc.parallelize([(19, 174430, 3.4), (19, 169786, 3.4)]) 

schema = StructType([ 
    StructField('user_id', IntegerType()), 
    StructField('game_id', IntegerType()), 
    StructField('rating', FloatType()) 
    ]) 

df = spark.createDataFrame(rdd, schema) 

df.show() 

注意:我已經測試了這個使用火花2.1.0。在這種情況下sparkSparkSession對象。

+0

請注意這裏重要的一點:要轉換的rdd中的變量類型需要與您傳遞給createDataFrame的模式相匹配 – Jomonsugi