2016-12-21 85 views
0

我對Spark很新,我一直在嘗試讓Spark瞭解我的JSON輸入,但我一直沒有管理過。總之,我正在使用Spark的ALS算法來給出建議。當我提供一個csv文件作爲輸入時,一切正常。然而,我的輸入實際上是一個JSON,具體如下:JSON到Python中的Spark RDD

all_user_recipe_rating = [{'rating': 1, 'recipe_id': 8798, 'user_id': 2108}, {'rating': 4, 'recipe_id': 6985, 'user_id': 4236}, {'rating': 4, 'recipe_id': 13572, 'user_id': 2743}, {'rating': 4, 'recipe_id': 6312, 'user_id': 3156}, {'rating': 1, 'recipe_id': 12836, 'user_id': 768}, {'rating': 1, 'recipe_id': 9237, 'user_id': 1599}, {'rating': 2, 'recipe_id': 16946, 'user_id': 2687}, {'rating': 2, 'recipe_id': 20728, 'user_id': 58}, {'rating': 4, 'recipe_id': 12921, 'user_id': 2221}, {'rating': 2, 'recipe_id': 10693, 'user_id': 2114}, {'rating': 2, 'recipe_id': 18301, 'user_id': 4898}, {'rating': 2, 'recipe_id': 9967, 'user_id': 3010}, {'rating': 2, 'recipe_id': 16393, 'user_id': 4830}, {'rating': 4, 'recipe_id': 14838, 'user_id': 583}] 

ratings_RDD = self.spark.parallelize(all_user_recipe_rating) 

ratings = ratings_RDD.map(lambda row: 
    (Rating(int(row['user_id']), 
    int(row['recipe_id']), 
    float(row['rating'])))) 

model = self.build_model(ratings) 

這是我想出了看到一些例子之後,但是這是我得到:

MatrixFactorizationModel: User factor is not cached. Prediction could be slow. 
16/12/21 03:54:53 WARN MatrixFactorizationModel: Product factor does not have a partitioner. Prediction on individual records could be slow. 
16/12/21 03:54:53 WARN MatrixFactorizationModel: Product factor is not cached. Prediction could be slow. 
16/12/21 03:54:53 WARN MatrixFactorizationModelWrapper: User factor does not have a partitioner. Prediction on individual records could be slow. 

而且

File "/usr/local/spark/python/pyspark/mllib/recommendation.py", line 147, in <lambda> 
user_product = user_product.map(lambda u_p: (int(u_p[0]), int(u_p[1]))) 
TypeError: int() argument must be a string or a number, not 'Rating' 

有人能幫我一下嗎? :) 謝謝!

回答

0

那麼,

你的錯誤發生是由於一件事。

您受到的這個例外是關於ALS function的功能predictAll

這裏的問題是,你想一個評級對象發送到需要接收一個RDD<int, int>

我把你的代碼,並建立你所需要的一個功能:

>>> from pyspark.mllib.recommendation import Rating 
>>> ratings = ratings_RDD.map(lambda row: 
... (Rating(int(row['user_id']), 
... int(row['recipe_id']), 
... float(row['rating'])))) 
>>> model = ALS.trainImplicit(ratings, 1, seed=10) 
>>> to_predict = spark.parallelize([[2108, 16393], [583, 20728]]) 
>>> model.predictAll(to_predict).take(2) 
[Rating(user=583, product=20728, rating=0.0741161997082127), Rating(user=2108, product=16393, rating=0.05669039815320609)] 

你JSON是沒有錯,當你撥打predictAll,你發送的是Rating對象而不是RDD<int, int>