我正在嘗試運行帶有簡單Spark步驟執行的EMR集羣,並遇到無法解析的錯誤。該程序在Eclipse中本地運行時運行,但在EMR羣集上運行時無法運行。該程序只是試圖將S3上的CSV文件轉換爲Parquet格式。Amazon EMR集羣上的spark-csv錯誤
當我在EMR運行,我得到以下錯誤:
Caused by: com.univocity.parsers.common.TextParsingException: Length of parsed input (1000001) exceeds the maximum number of characters defined in your parser settings (1000000). Identified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '\n'. Parsed content:
我沒有超過1000000限制的任何領域。我曾嘗試從s3,s3n和s3a位置讀取數據。
import org.apache.spark.SparkSession
import org.apache.spark.sql.types._
object TestEMR {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("Spark Convert to Parquet").getOrCreate()
val schema = StructType(
Array(
StructField("field1", StringType ,nullable = true),
StructField("field2", IntegerType ,nullable = true),
StructField("field3", IntegerType ,nullable = true),
StructField("field4", TimestampType ,nullable = true),
StructField("field5", TimestampType ,nullable = true),
StructField("field6", StringType ,nullable = true),
StructField("field7", StringType ,nullable = true),
StructField("field8", StringType ,nullable = true),
StructField("field9", StringType ,nullable = true),
StructField("field10", StringType ,nullable = true),
StructField("field11", StringType ,nullable = true),
StructField("field12", StringType ,nullable = true),
StructField("field13", StringType ,nullable = true),
StructField("field14", StringType ,nullable = true),
StructField("field15", StringType ,nullable = true),
StructField("field16", StringType ,nullable = true),
StructField("field17", StringType ,nullable = true),
StructField("field18", StringType ,nullable = true),
StructField("field19", StringType ,nullable = true),
StructField("field20", StringType ,nullable = true)
)
)
val df = spark.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("nullValue","")
.option("treatEmptyValuesAsNulls","true")
.load("s3://mybucket/input/myfile.csv")
df.write.mode("append").parquet("s3://mybucket/output/myfile")
spark.stop
}
}
文件是罰款。加載操作只是不將文件拆分爲換行符。我能夠將代碼轉換爲sc.texfile(myfile),並且它可以正常讀取文件。 –
有趣。 FWIW Spark 2具有內置的CSV解析器,這意味着您可以針對火花團隊提交JIRA文件。我運行的s3a集成測試可以使用該模塊在s3上使用.csv.gz文件 –