2017-07-05 117 views
0

我正在使用pyspark進行迴歸分類算法。我想將模型輸出保存到CSV文件中。我已經使用dataframe.savaAstextFile('hdfs:// a/b/x'),但它會拋出一個錯誤,指出saveAstextFile屬性不在列表中。請參閱下面的代碼和通知:如何將數據幀的輸出寫入pyspark的CSV文件

from __future__ import print_function 
from pyspark import SparkContext 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 
from pyspark.ml import Pipeline, PipelineModel 
from pyspark.ml.classification import RandomForestClassifier as RF 
from pyspark.ml.classification import DecisionTreeClassifier 
from pyspark.ml.clustering import KMeans 
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, SQLTransformer 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator 
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric 
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 
import pandas 
import numpy as np 
#from matplotlib import pyplot as plt 
from pyspark.ml.feature import OneHotEncoder 
from pyspark.sql.functions import udf 
from pyspark.sql.types import StructType 

from sklearn.metrics import roc_curve, auc, accuracy_score, adjusted_rand_score, roc_auc_score 

sc = SparkContext() 
sqlContext = SQLContext(sc) 

inData = sc.textFile("hdfs://ML_test_data/train2.csv") 
header = inData.first() 

fields = [StructField(field_name, StringType(), True) for field_name in header.split(',')] 

fields[0].dataType = IntegerType() 
fields[4].dataType = IntegerType() 
fields[7].dataType = IntegerType() 
fields[8].dataType = IntegerType() 
#fields[9].dataType = IntegerType() 
#fields[10].dataType = StructType() 
fields[11].dataType = IntegerType() 

schema = StructType(fields) 

actualData = inData.filter(lambda x: x != header) 

data_temp = actualData.map(lambda x:x.split(',')).map(lambda p:(int(p[0]),p[1], p[2], p[3], int(p[4]), 
                  p[5], p[6], int(p[7]), int(p[8]), p[9], p[10], int(p[11]))) 

df = sqlContext.createDataFrame(data_temp, schema) 
print (df.dtypes) 
''' 
df.createOrReplaceTempView("testTab") 
result_df = sqlContext.sql("select Product_Category_2 from testTab ") 
print (result_df.show) 

result_collect = result_df.collect() 
for i in result_collect: 
    avg_prod_cat2 = i 
    print ("avg_prod_cat2",avg_prod_cat2) 

''' 

def prodCat2(originalCol): 

    if originalCol == '': 
     return '2' 
    else: 

     return originalCol 

udfValProdCt2 = udf(prodCat2, StringType()) 
df = df.withColumn("prod_cat2", udfValProdCt2("Product_Category_2")) 
df = df.withColumn("prod_cat3", udfValProdCt2("Product_Category_3")) 
print (df.dtypes) 
df = df.drop("Product_Category_2") 
df = df.drop("Product_Category_3") 
print (df.dtypes) 
print (df.show) 
#p_df = df.toPandas() 
#print (p_df.head(15)) 

column_vec_in = ['User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 
       'Product_Category_1', 'Purchase', 'prod_cat2', 'prod_cat3'] 
column_vec_out = ['User_ID_catVec', 'Product_ID_catVec', 'Gender_catVec', 'Age_catVec', 'Occupation_catVec', 'City_Category_catVec', 
        'Stay_In_Current_City_Years_catVec', 'Marital_Status_catVec', 'Product_Category_1_catVec', 
        'Purchase_catVec', 'prod_cat2_catVec', 'prod_cat3_catVec'] 

indexers = [StringIndexer(inputCol=x, outputCol=x +'_tmp') for x in column_vec_in] 

encoders = [OneHotEncoder(dropLast=False, inputCol=x+'_tmp', outputCol=y) for x,y in zip(column_vec_in, column_vec_out)] 

tmp = [[i,j] for i, j in zip(indexers, encoders)] 
tmp = [i for sublist in tmp for i in sublist] 

cols_now = ['Product_ID_catVec', 'Gender_catVec', 'Age_catVec', 'Occupation_catVec', 'City_Category_catVec', 
      'Stay_In_Current_City_Years_catVec', 'Product_Category_1', 'Purchase', 'prod_cat2_catVec', 'prod_cat3_catVec'] 
assembler_features = VectorAssembler(inputCols=cols_now, outputCol='features') 
labelIndexer = StringIndexer(inputCol='Purchase', outputCol="label") 
tmp += [assembler_features, labelIndexer] 

pipeline = Pipeline(stages=tmp) 

pipelineModel = pipeline.fit(df) 

allData = pipelineModel.transform(df) 

allData.cache() 
trainingData, testData = allData.randomSplit([0.7,0.3], seed=4) 
print (trainingData.count()) 
print(testData.count()) 

rf = RF(labelCol='label', featuresCol='features', numTrees=20) 

rfModel= rf.fit(trainingData) 
#print("Coefficients : \n" + str(rfModel.coefficientMatrix)) 
predictions = rfModel.transform(testData) 

predictions.printSchema() 

result = predictions.select('User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 
       'Product_Category_1', 'Purchase', 'prod_cat2', 'prod_cat3', 'label', 'probability', 'prediction').collect() 

result.saveAsTextFile() 

for row in result: 
    print("User_ID= %s, Product_id= %s -> prob= %s, prediction= %s" %(row.User_ID, row.Product_ID, row.probability, row.prediction)) 

感謝您的快速幫助。 謝謝。

回答

0

也許你已經找到了完美的解決方案,但是讓我寫一個簡短的代碼來寫和讀pyspark Dataframes

這是一個簡短的代碼來創建數據幀

x = [1,2,3,4] 
y = [0,1,0,1] 
df = sc.parallelize([x,y]).toDF(['A', 'B','C','D']) 

星火2.2.X

寫入數據幀到csv

df.write.csv('/tmp/out') 

讀的CSV DAT aframe

newdf = spark.read.csv("/tmp/out")