0
我正在使用pyspark進行迴歸分類算法。我想將模型輸出保存到CSV文件中。我已經使用dataframe.savaAstextFile('hdfs:// a/b/x'),但它會拋出一個錯誤,指出saveAstextFile屬性不在列表中。請參閱下面的代碼和通知:如何將數據幀的輸出寫入pyspark的CSV文件
from __future__ import print_function
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import RandomForestClassifier as RF
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, SQLTransformer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas
import numpy as np
#from matplotlib import pyplot as plt
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType
from sklearn.metrics import roc_curve, auc, accuracy_score, adjusted_rand_score, roc_auc_score
sc = SparkContext()
sqlContext = SQLContext(sc)
inData = sc.textFile("hdfs://ML_test_data/train2.csv")
header = inData.first()
fields = [StructField(field_name, StringType(), True) for field_name in header.split(',')]
fields[0].dataType = IntegerType()
fields[4].dataType = IntegerType()
fields[7].dataType = IntegerType()
fields[8].dataType = IntegerType()
#fields[9].dataType = IntegerType()
#fields[10].dataType = StructType()
fields[11].dataType = IntegerType()
schema = StructType(fields)
actualData = inData.filter(lambda x: x != header)
data_temp = actualData.map(lambda x:x.split(',')).map(lambda p:(int(p[0]),p[1], p[2], p[3], int(p[4]),
p[5], p[6], int(p[7]), int(p[8]), p[9], p[10], int(p[11])))
df = sqlContext.createDataFrame(data_temp, schema)
print (df.dtypes)
'''
df.createOrReplaceTempView("testTab")
result_df = sqlContext.sql("select Product_Category_2 from testTab ")
print (result_df.show)
result_collect = result_df.collect()
for i in result_collect:
avg_prod_cat2 = i
print ("avg_prod_cat2",avg_prod_cat2)
'''
def prodCat2(originalCol):
if originalCol == '':
return '2'
else:
return originalCol
udfValProdCt2 = udf(prodCat2, StringType())
df = df.withColumn("prod_cat2", udfValProdCt2("Product_Category_2"))
df = df.withColumn("prod_cat3", udfValProdCt2("Product_Category_3"))
print (df.dtypes)
df = df.drop("Product_Category_2")
df = df.drop("Product_Category_3")
print (df.dtypes)
print (df.show)
#p_df = df.toPandas()
#print (p_df.head(15))
column_vec_in = ['User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status',
'Product_Category_1', 'Purchase', 'prod_cat2', 'prod_cat3']
column_vec_out = ['User_ID_catVec', 'Product_ID_catVec', 'Gender_catVec', 'Age_catVec', 'Occupation_catVec', 'City_Category_catVec',
'Stay_In_Current_City_Years_catVec', 'Marital_Status_catVec', 'Product_Category_1_catVec',
'Purchase_catVec', 'prod_cat2_catVec', 'prod_cat3_catVec']
indexers = [StringIndexer(inputCol=x, outputCol=x +'_tmp') for x in column_vec_in]
encoders = [OneHotEncoder(dropLast=False, inputCol=x+'_tmp', outputCol=y) for x,y in zip(column_vec_in, column_vec_out)]
tmp = [[i,j] for i, j in zip(indexers, encoders)]
tmp = [i for sublist in tmp for i in sublist]
cols_now = ['Product_ID_catVec', 'Gender_catVec', 'Age_catVec', 'Occupation_catVec', 'City_Category_catVec',
'Stay_In_Current_City_Years_catVec', 'Product_Category_1', 'Purchase', 'prod_cat2_catVec', 'prod_cat3_catVec']
assembler_features = VectorAssembler(inputCols=cols_now, outputCol='features')
labelIndexer = StringIndexer(inputCol='Purchase', outputCol="label")
tmp += [assembler_features, labelIndexer]
pipeline = Pipeline(stages=tmp)
pipelineModel = pipeline.fit(df)
allData = pipelineModel.transform(df)
allData.cache()
trainingData, testData = allData.randomSplit([0.7,0.3], seed=4)
print (trainingData.count())
print(testData.count())
rf = RF(labelCol='label', featuresCol='features', numTrees=20)
rfModel= rf.fit(trainingData)
#print("Coefficients : \n" + str(rfModel.coefficientMatrix))
predictions = rfModel.transform(testData)
predictions.printSchema()
result = predictions.select('User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status',
'Product_Category_1', 'Purchase', 'prod_cat2', 'prod_cat3', 'label', 'probability', 'prediction').collect()
result.saveAsTextFile()
for row in result:
print("User_ID= %s, Product_id= %s -> prob= %s, prediction= %s" %(row.User_ID, row.Product_ID, row.probability, row.prediction))
感謝您的快速幫助。 謝謝。