2015-04-29 36 views
12

我在Spark上使用python,並希望將csv獲取到數據框中。Get CSV到Spark數據框

Spark SQL的documentation奇怪地不提供CSV作爲源的解釋。

我發現Spark-CSV,但是我有問題與文件兩個部分組成:

  • "This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3" 我真的需要補充這樣的說法,每次我推出pyspark或火花提交?看起來很不雅。是不是有辦法導入它在python而不是每次redownloading它?

  • df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv")即使我這樣做,這將無法正常工作。在這行代碼中,「源」參數代表什麼?我如何簡單地在Linux上加載本地文件,如「/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv」?

回答

11

將csv文件讀入RDD,然後從原始RDD生成RowRDD。

創建由匹配行中,在步驟1

創建的RDD結構通過由SQLContext提供createDataFrame方法應用模式以行的RDD一個StructType表示的架構。

lines = sc.textFile("examples/src/main/resources/people.txt") 
parts = lines.map(lambda l: l.split(",")) 
# Each line is converted to a tuple. 
people = parts.map(lambda p: (p[0], p[1].strip())) 

# The schema is encoded in a string. 
schemaString = "name age" 

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] 
schema = StructType(fields) 

# Apply the schema to the RDD. 
schemaPeople = spark.createDataFrame(people, schema) 

來源:SPARK PROGRAMMING GUIDE

+0

這個答案是舊的,火花的新版本有更簡單的方法來實現這一目標。參考答案https://stackoverflow.com/a/41638342/187355和https://stackoverflow.com/a/46539901/187355 –

20
from pyspark.sql.types import StringType 
from pyspark import SQLContext 
sqlContext = SQLContext(sc) 

Employee_rdd = sc.textFile("\..\Employee.csv") 
       .map(lambda line: line.split(",")) 

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name']) 

Employee_df.show() 
+0

這個答案有幾個upvotes,但它不完全清楚發生了什麼事 - 你做SQLContext (sc)並調用sqlContext,那麼你不用做任何事情..是隻是無關的代碼? 當我嘗試在一個飛艇筆記本一個簡單的csv文件相同的代碼我得到的錯誤: '''回溯(最近通話最後一個): 文件「/tmp/zeppelin_pyspark-7664300769638364279.py」,行252在 的eval(compiledCode) 文件 「」,1號線,在 AttributeError的: '詮釋' 對象有沒有屬性「map'''' – tamale

+0

請分享您的代碼,以獲得幫助。我已經在幾個實例中使用了代碼,我沒有問題 –

0

我遇到了類似的問題。解決方案是添加一個名爲「PYSPARK_SUBMIT_ARGS」的環境變量,並將其值設置爲「--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell」。這適用於Spark的Python交互式shell。

確保您將spark-csv的版本與安裝的Scala版本相匹配。在Scala 2.11中,它是spark-csv_2.11,Scala 2.10或2.10.5是spark-csv_2.10。

希望它有效。

8

如果您不介意額外的軟件包依賴關係,可以使用Pandas來解析CSV文件。它處理內部逗號就好了。

依賴關係:

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 

閱讀整個文件一旦進入一個Spark數據幀:

sc = SparkContext('local','example') # if using locally 
sql_sc = SQLContext(sc) 

pandas_df = pd.read_csv('file.csv') # assuming the file contains a header 
# If no header: 
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) 
s_df = sql_sc.createDataFrame(pandas_df) 

或者,甚至更多的數據,自覺地,你可以塊中的數據轉換成星火RDD然後DF :

chunk_100k = pd.read_csv('file.csv', chunksize=100000) 

for chunky in chunk_100k: 
    Spark_temp_rdd = sc.parallelize(chunky.values.tolist()) 
    try: 
     Spark_full_rdd += Spark_temp_rdd 
    except NameError: 
     Spark_full_rdd = Spark_temp_rdd 
    del Spark_temp_rdd 

Spark_DF = Spark_full_rdd.toDF(['column 1','column 2']) 
+0

createDataFrame經常給出和這樣的錯誤:IllegalArgumentException:「實例化'org.apache.spark.sql.hive.HiveSessionState'時出錯:」...任何遇到命中這個? – mathtick

6

繼Spark 2.0之後,推薦使用Spark會話:

from pyspark.sql import SparkSession 
from pyspark.sql import Row 

# Create a SparkSession 
spark = SparkSession \ 
    .builder \ 
    .appName("basic example") \ 
    .config("spark.some.config.option", "some-value") \ 
    .getOrCreate() 

def mapper(line): 
    fields = line.split(',') 
    return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3])) 

lines = spark.sparkContext.textFile("file.csv") 
df = lines.map(mapper) 

# Infer the schema, and register the DataFrame as a table. 
schemaDf = spark.createDataFrame(df).cache() 
schemaDf.createOrReplaceTempView("tablename") 
6

隨着更新版本的Spark(我相信,1.4),這變得更容易。表達sqlContext.read給你一個DataFrameReader實例,具有.csv()方法:

df = sqlContext.read.csv("/path/to/your.csv") 

請注意,您也可以表明,csv文件通過添加關鍵字參數header=True.csv()通話有一個頭。其他一些選項可用,並在上面的鏈接中進行了描述。

0

根據Aravind的回答,但要短得多,例如, :

lines = sc.textFile("/path/to/file").map(lambda x: x.split(",")) 
df = lines.toDF(["year", "month", "day", "count"]) 
2

爲Pyspark,假設csv文件的第一行包含標題

spark = SparkSession.builder.appName('chosenName').getOrCreate() 
df=spark.read.csv('fileNameWithPath', mode="DROPMALFORMED",inferSchema=True, header = True)