2016-09-21 75 views
0

我試圖通過從hadoop集羣獲取.csv數據並將其放入Pandas DataFrame來創建Spark工作流。我能夠從HDFS中提取數據並將其放入RDD中,但無法將其處理到Pandas Dataframe中。 以下是我的代碼:用.csv格式的HDFS文件創建熊貓數據框

import pandas as pd 
import numpy as nm 

A=sc.textFile("hdfs://localhost:9000/sales_ord_univ.csv") # this creates the RDD 
B=pd.DataFrame(A) # this gives me the following error:pandas.core.common.PandasError: DataFrame constructor not properly called! 

我敢肯定,這個錯誤是這樣的,由於RDD是一個大單名單, 因此,我試圖通過拆分數據「;」(即每新行是不同的字符串) 但這似乎也沒有幫助。

我的總體目標是使用Pandas將CSV更改爲JSON並輸出到MongoDB。我已經使用DictReader,PysparkSQL完成了這個項目,但是想要檢查是否可以使用Pandas。

任何幫助,將不勝感激 謝謝!

回答

1

我建議將csv加載到Spark DataFrame中並將其轉換爲Pandas DataFrame。

csvDf = sqlContext.read.format("csv").option("header", "true").option("inferschema", "true").option("mode", "DROPMALFORMED").load("hdfs://localhost:9000/sales_ord_univ.csv") 
B = csvDf.toPandas() 

如果仍然使用一個Spark版本< 2.0,則必須使用read.format("com.databricks.spark.csv")和包括com.databricks.spark.csv包(例如與--packages參數使用pyspark殼時)。

0

你需要hdfs (2.0.16)

from hdfs import Config 
zzodClient = Config().get_client('zzod') #refer to the docs to set up config 
with zzodClient.read(q2Path) as r2Reader: 
    r2 = pandas.read_csv(r2Reader)