2017-01-24 154 views
0

我正在嘗試運行帶有簡單Spark步驟執行的EMR集羣,並遇到無法解析的錯誤。該程序在Eclipse中本地運行時運行,但在EMR羣集上運行時無法運行。該程序只是試圖將S3上的CSV文件轉換爲Parquet格式。Amazon EMR集羣上的spark-csv錯誤

當我在EMR運行,我得到以下錯誤:

Caused by: com.univocity.parsers.common.TextParsingException: Length of parsed input (1000001) exceeds the maximum number of characters defined in your parser settings (1000000). Identified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '\n'. Parsed content:

我沒有超過1000000限制的任何領域。我曾嘗試從s3,s3n和s3a位置讀取數據。

import org.apache.spark.SparkSession 
    import org.apache.spark.sql.types._ 

    object TestEMR { 
     def main(args: Array[String]) { 
     val spark = SparkSession.builder().appName("Spark Convert to Parquet").getOrCreate() 
     val schema = StructType(
      Array(
       StructField("field1", StringType ,nullable = true), 
       StructField("field2", IntegerType ,nullable = true), 
       StructField("field3", IntegerType ,nullable = true), 
       StructField("field4", TimestampType ,nullable = true), 
       StructField("field5", TimestampType ,nullable = true), 
       StructField("field6", StringType ,nullable = true), 
       StructField("field7", StringType ,nullable = true), 
       StructField("field8", StringType ,nullable = true), 
       StructField("field9", StringType ,nullable = true), 
       StructField("field10", StringType ,nullable = true), 
       StructField("field11", StringType ,nullable = true), 
       StructField("field12", StringType ,nullable = true), 
       StructField("field13", StringType ,nullable = true), 
       StructField("field14", StringType ,nullable = true), 
       StructField("field15", StringType ,nullable = true), 
       StructField("field16", StringType ,nullable = true), 
       StructField("field17", StringType ,nullable = true), 
       StructField("field18", StringType ,nullable = true), 
       StructField("field19", StringType ,nullable = true), 
       StructField("field20", StringType ,nullable = true) 
      ) 
     ) 

     val df = spark.read 
      .format("com.databricks.spark.csv") 
      .schema(schema) 
      .option("nullValue","") 
      .option("treatEmptyValuesAsNulls","true") 
      .load("s3://mybucket/input/myfile.csv") 
     df.write.mode("append").parquet("s3://mybucket/output/myfile") 
     spark.stop 
     } 
    } 

回答

0

聽起來像它沒有找到行結束,所以不斷讀入,直到它在單行上達到10K字符的限制。

正如他們所說:檢查文件的換行符

+0

文件是罰款。加載操作只是不將文件拆分爲換行符。我能夠將代碼轉換爲sc.texfile(myfile),並且它可以正常讀取文件。 –

+0

有趣。 FWIW Spark 2具有內置的CSV解析器,這意味着您可以針對火花團隊提交JIRA文件。我運行的s3a集成測試可以使用該模塊在s3上使用.csv.gz文件 –

0

這一問題仍然在spark-csv jira開放。如果您沒有數據問題或讀取爲RDD,然後創建數據框,則他們提供了使用open csv解析器的解決方法。

val rdd = sc.textFile("file.csv") 
// Here, filtering or transformation 
//val filteredRDD = rdd.filter.. 
//val transformedRDD = rdd.map.. 

val df = new CsvParser().csvRdd(sqlContext, transformedRDD)