2016-08-30 86 views
0

我正在使用Spark MLib ALS並試圖使用trainImplicit()接口爲其提供由用戶購買的項目的數量作爲隱式偏好。我不知道如何驗證我的模型。我的輸入是在域[1,inf),但輸出預測似乎是浮動在(0,1)中。Spark MLib ALS輸入和輸出域

通常的那種代碼:

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating 
from pyspark.sql import HiveContext 
from pyspark import SparkContext 

sc = SparkContext(appName="Quantity Prediction Model") 
hive = HiveContext(sc) 

d = hive.sql("select o.user_id as user, l.product_id as product, sum(l.quantity) as qty from order_line l join order_order o ON l.order_id = o.id group by o.user_id, l.product_id") 
d.write.save('user_product_qty') 

ratings = d.rdd.map(tuple) 
testdata = ratings.map(lambda t: (t[0], t[1])) 

for rank in (4, 8, 12): 
    model = ALS.trainImplicit(ratings, rank, 10, alpha=0.01) 

    predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) 
    ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) 

    # Error is pretty bad because output raitings aren't in the same domain as quantity 
    ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) 
    MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean() 

    print("Rank: {} MSE: {}".format(rank, MSE)) 

附加題:當使用train()什麼是輸入/輸出域? 「收視率」有多少預計會達到五點?這在任何地方都沒有記錄。

回答

0

RMSE並不是隱式ALS的理想度量標準(原文建議採用更精細的評估技術)。但是,如果將輸入評分映射到(-1; 1),仍然可以應用RMSE評估隱式ALS培訓結果。

詳情請參閱https://github.com/apache/spark/pull/597

最後,一些代碼讓你開始(由MovieLens例如inpsired從星火ALS):

// RMSE 
logger.info(s"Calculating RMSE on ${testingSet.count()} ratings") 
def groupRatings(rs: RDD[MLlibRating]): RDD[((Int, Int), Double)] = 
    rs.map { r => ((r.user, r.product), r.rating) } 

// When using implicit ALS we should treat actual and predicted 
// ratings as confidence levels. See also apache/spark#597. 
// 
// Predicted ratings are clamped to [0;1] 
def clampPredicted(r: Double): Double = 
    math.max(math.min(r, 1.0), 0.0) 

def clampActual(r: Double): Double = if (r > 0.0) 1.0 else 0.0 

def sqr(x: Double): Double = x * x 

val ratingSquaredErrors = 
    groupRatings(alsModel.predict(testingSet.map { r => (r.user, r.product) })) 
    .join(groupRatings(testingSet)) 
    .map { case (_, (predictedRating, actualRating)) => 
     sqr(clampPredicted(predictedRating) - clampActual(actualRating)) } 
val rmse = sqrt(ratingSquaredErrors.mean()) 
logger.info(s"RMSE: ${rmse}")