2017-05-29 93 views
0

我的目標是基於等級評估添加到CrossValidator功能(PySpark)自定義計算器交叉驗證SPARK期間

cvExplicit = CrossValidator(estimator=cvSet, numFolds=8, estimatorParamMaps=paramMap,evaluator=rnkEvaluate) 

雖然我需要評估的數據幀傳遞到功能,我不知道如何做那部分。

class rnkEvaluate(): 
def __init__(self, user_col = "user", rating_col ="rating", prediction_col = "prediction"): 
    print(user_col) 
    print(rating_col) 
    print(prediction_col) 

def isLargerBetter(): 
    return True 


def evaluate(self,predictions): 
    denominator = 
    predictions.groupBy().sum(self._rating_col).collect()[0][0] 
    TODO 
    rest of the calculation ... 
    return numerator/denominator 

不知怎的,我需要在每次迭代中傳遞預測數據幀,但我無法管理它。

回答

0

我已經解決了這個問題,在這裏如下代碼:

import numpy as np 

from pyspark.ml.tuning import CrossValidator, CrossValidatorModel 
from pyspark.sql.functions import rand 

result = [] 
class CrossValidatorVerbose(CrossValidator): 

    def writeResult(result): 
     resfile = open('executions/results.txt', 'a') 
     resfile.writelines("\n") 
     resfile.writelines(result) 
     resfile.close() 

    def _fit(self, dataset): 
     est = self.getOrDefault(self.estimator) 
     epm = self.getOrDefault(self.estimatorParamMaps) 
     numModels = len(epm) 

     eva = self.getOrDefault(self.evaluator) 
     metricName = eva.getMetricName() 

     nFolds = self.getOrDefault(self.numFolds) 
     seed = self.getOrDefault(self.seed) 
     h = 1.0/nFolds 

     randCol = self.uid + "_rand" 
     df = dataset.select("*", rand(seed).alias(randCol)) 
     metrics = [0.0] * numModels 

     for i in range(nFolds): 
      foldNum = i + 1 
      print("Comparing models on fold %d" % foldNum) 

      validateLB = i * h 
      validateUB = (i + 1) * h 
      condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB) 
      validation = df.filter(condition) 
      train = df.filter(~condition) 

      for j in range(numModels): 
       paramMap = epm[j] 
       model = est.fit(train, paramMap) 

       predictions = model.transform(validation, paramMap) 
       #print(predictions.show()) 
       metric = eva.evaluate(spark=spark, predictions=predictions) 
       metrics[j] += metric 

       avgSoFar = metrics[j]/foldNum 

       res=("params: %s\t%s: %f\tavg: %f" % (
        {param.name: val for (param, val) in paramMap.items()}, 
        metricName, metric, avgSoFar)) 
       writeResult(res) 
       result.append(res) 
       print(res) 

     if eva.isLargerBetter(): 
      bestIndex = np.argmax(metrics) 
     else: 
      bestIndex = np.argmin(metrics) 

     bestParams = epm[bestIndex] 
     bestModel = est.fit(dataset, bestParams) 
     avgMetrics = [m/nFolds for m in metrics] 
     bestAvg = avgMetrics[bestIndex] 
     print("Best model:\nparams: %s\t%s: %f" % (
      {param.name: val for (param, val) in bestParams.items()}, 
      metricName, bestAvg)) 

     return self._copyValues(CrossValidatorModel(bestModel, avgMetrics)) 


evaluator = RankUserWeighted("user","rating","prediction") 

cvImplicit = CrossValidatorVerbose(estimator=customImplicit, numFolds=8, estimatorParamMaps=paramMap 
          ,evaluator=evaluator)