2017-05-19 187 views
4

我有一個結構化的這樣一個CSV文件:如何在使用PySpark作爲數據框讀取CSV文件時跳過行?

Header 
Blank Row 
"Col1","Col2" 
"1,200","1,456" 
"2,000","3,450" 

我在閱讀本文件中的兩個問題。

  1. 我想忽略頁眉和忽略值內
  2. 的逗號的空白行不是分隔

這裏是我的嘗試:

df = sc.textFile("myFile.csv")\ 
       .map(lambda line: line.split(","))\ #Split By comma 
       .filter(lambda line: len(line) == 2).collect() #This helped me ignore the first two rows 

然而,這是行不通的,因爲值內的逗號被當作分隔符來讀取,並且len(line)返回4而不是2.

我嘗試另一種方法:

data = sc.textFile("myFile.csv") 
headers = data.take(2) #First two rows to be skipped 

的想法是,然後使用濾波器,而不是讀出的標頭。但是,當我試圖打印標題時,我得到了編碼值。

[\x00A\x00Y\x00 \x00J\x00u\x00l\x00y\x00 \x002\x000\x001\x006\x00] 

什麼是讀取CSV文件並跳過前兩行的正確方法?

回答

0

通過Zlidime回答有正確的想法。工作方案是這樣的:

import csv 

customSchema = StructType([ \ 
    StructField("Col1", StringType(), True), \ 
    StructField("Col2", StringType(), True)]) 

df = sc.textFile("file.csv")\ 
     .mapPartitions(lambda partition: csv.reader([line.replace('\0','') for line in partition],delimiter=',', quotechar='"')).filter(lambda line: len(line) > 2 and line[0] != 'Col1')\ 
     .toDF(customSchema) 
4

嘗試使用帶有'quotechar'參數的csv.reader,它會正確拆分該行。 之後,您可以根據需要添加濾鏡。

import csv 
from pyspark.sql.types import StringType 

df = sc.textFile("test2.csv")\ 
      .mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"')).filter(lambda line: len(line)>=2 and line[0]!= 'Col1')\ 
      .toDF(['Col1','Col2']) 
+0

不錯@zlidime! – titipata

+0

csv。讀取器拋出錯誤:行包含空字節 –

+0

我通過調用'csv.reader([l.replace('\ 0','')in line],delimiter =',',quotechar ='''')來修復它。 ' –

4

對於你的第一個問題,只是壓縮與zipWithIndex在RDD的線條和過濾你不想要的線條。 對於第二個問題,您可以嘗試從行刪除第一個和最後一個雙引號字符,然後拆分","上的行。

rdd = sc.textFile("myfile.csv") 
rdd.zipWithIndex(). 
    filter(lambda x: x[1] > 2). 
    map(lambda x: x[0]). 
    map(lambda x: x.strip('"').split('","')). 
    toDF(["Col1", "Col2"]) 

儘管如此,如果你正在尋找對付CSV文件星火一種標準的方式,這是更好地從databricks使用spark-csv包。

+1

Upvoted for your「though」 - 另外,該包不應該與Spark 2一起使用,因爲它已經集成到Spark中,這使得「雖然」更重要,我會強烈建議這樣做因爲這是經典的數據標準化/正則化,它不應該成爲分析管道的一部分,在Spark之外這樣做可以讓你使用自定義工具來完成這項工作,然後擁有一個合適的文件格式,每個人都可以使用 –

0

爲什麼不試試pyspark.sqlDataFrameReader API?這很容易。對於這個問題,我想這條線就足夠了。

df = spark.read.csv("myFile.csv") # By default, quote char is " and separator is ',' 

有了這個API,你也可以玩弄其他一些參數,比如標題行,忽略前後空格。這裏是鏈接:DataFrameReader API

+0

這不允許我跳過 –

+0

你試過用'ignoreLeadingWhiteSpace'或'ignoreTrailingWhiteSpace'設置爲True嗎?我不確定它會工作,但至少,給它一試。 –

+0

並且也嘗試'mode = DROPMALFORMED'。我的假設是,它會認爲空行是腐敗的。 –

1

如果CSV文件的結構總是有兩列,在斯卡拉可以實現:

val struct = StructType(
    StructField("firstCol", StringType, nullable = true) :: 
    StructField("secondCol", StringType, nullable = true) :: Nil) 

val df = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "false") 
    .option("inferSchema", "false") 
    .option("delimiter", ",") 
    .option("quote", "\"") 
    .schema(struct) 
    .load("myFile.csv") 

df.show(false) 

val indexed = df.withColumn("index", monotonicallyIncreasingId()) 
val filtered = indexed.filter(col("index") > 2).drop("index") 

filtered.show(false) 

結果是:

+---------+---------+ 
|firstCol |secondCol| 
+---------+---------+ 
|Header |null  | 
|Blank Row|null  | 
|Col1  |Col2  | 
|1,200 |1,456 | 
|2,000 |3,450 | 
+---------+---------+ 

+--------+---------+ 
|firstCol|secondCol| 
+--------+---------+ 
|1,200 |1,456 | 
|2,000 |3,450 | 
+--------+---------+ 
+0

PySpark允許你也這樣做。這將工作,如果它不是頭。只有標題get被讀入,其他行被跳過。 –

相關問題