2017-05-07 78 views
-1

我想在Spark Python中以CSV格式保存下面的文件。 文件:Spark Python CSV寫

44,8602,37.19

35,5368,65.89

2,3391,40.64

47,6694,14.98

29,680,13.08

import csv 
    import StringIo 
    inputa =sc.textFile("file:///home/kumar/customer-orders.csv") 
    def writerecord(line): 
     output=StringIO.StringIO() 
     writer=csv.DictWriter(output,fieldnames["a","b","c"],extrasaction='ignore',delimiter = ',') 
     for record in line: 
      writer.writerow(record) 
     return [output.getvalue()] 
    inputa.mapPartitions(writerecord).saveAsTextFile("file:////home/godffddf2888/resultcx12"") 

我收到以下錯誤:

Error process() 
File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
     File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2355, in pipeline_func 
return func(split, prev_func(split, iterator)) 
File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 317, in func 
    return f(iterator) 
File "<stdin>", line 5, in writerecord 
File "/usr/lib64/python2.7/csv.py", line 148, in writerow 
    return self.writer.writerow(self._dict_to_list(rowdict)) 
File "/usr/lib64/python2.7/csv.py", line 145, in _dict_to_list 
    return [rowdict.get(key, self.restval) for key in self.fieldnames] 
    AttributeError: 'unicode' object has no attribute 'get' 

回答

1

我希望這個代碼,你想要做什麼:

import csv 
import io 
from pyspark import SparkContext 

sc = SparkContext() 

inputa =sc.textFile("./customer-orders.csv") 

def toCSVlines(data): 
    return ','.join(str(i) for i in data) 
# function useful for saving data 

columns_name = [["a","b","c"]] 
header = sc.parallelize(columns_name) # RDD with columns name 

rdd = inputa.mapPartitions(lambda x: csv.reader(x)) # read the data of your input .csv file 
print (rdd.collect()) 
# [['44', '8602', '37.19'], ['35', '5368', '65.89'], ['2', '3391', '40.64'], ['47', '6694', '14.98'], ['29', '680', '13.08']] 

rdd_tot = header.union(rdd) # create a single RDD with column names and values 

print (rdd_tot.collect()) 
# [['a', 'b', 'c'], ['44', '8602', '37.19'], ['35', '5368', '65.89'], ['2', '3391', '40.64'], ['47', '6694', '14.98'], ['29', '680', '13.08']] 

rdd_tot = rdd_tot.map(lambda line: toCSVlines(line)) 
rdd_tot.coalesce(1).saveAsTextFile("/whereverYouWant") 
# you will have something like this 
# a,b,c 
# 44,8602,37.19 
# 35,5368,65.89 
# 2,3391,40.64 
# 47,6694,14.98 
# 29,680,13.08 
0

要讀取CSV文件轉換成數據幀(以及添加列):

df = spark.read.csv(path='my_csv_file.csv', 
        schema=['cola', 'colb', 'colc']) 

你可以操縱對數據幀的數據,當你完成它後,你可以把它寫回csv格式:

df.write.csv(path='output_file.csv', mode='overwrite')