1

當我偶然發現與Dataframe保存相關的問題時,我將Spark 2.0中的代碼遷移到2.1。Spark 2.1無法在CSV上寫入矢量字段

下面的代碼使用星火2.0.0

時,使用星火2.1.0.cloudera1

import org.apache.spark.sql.types._ 
import org.apache.spark.ml.linalg.VectorUDT 
val df = spark.createDataFrame(Seq(Tuple1(1))).toDF("values") 
val toSave = new org.apache.spark.ml.feature.VectorAssembler().setInputCols(Array("values")).transform(df) 
toSave.write.csv(path) 

此代碼成功,我得到以下錯誤:

java.lang.UnsupportedOperationException: CSV data source does not support struct<type:tinyint,size:int,indices:array<int>,values:array<double>> data type. 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.org$apache$spark$sql$execution$datasources$csv$CSVFileFormat$$verifyType$1(CSVFileFormat.scala:233) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$verifySchema$1.apply(CSVFileFormat.scala:237) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$verifySchema$1.apply(CSVFileFormat.scala:237) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at org.apache.spark.sql.types.StructType.foreach(StructType.scala:96) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.verifySchema(CSVFileFormat.scala:237) 
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.prepareWrite(CSVFileFormat.scala:121) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:108) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) 
    at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:484) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:520) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198) 
    at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:579) 
    ... 50 elided 

這是隻在我身邊?

這與Spark 2.1的cloudera版本有關嗎? (從他們的回購,似乎他們沒有搞砸spark.sql所以也許不)

謝謝!

+1

這是預期。 CSV源不支持複雜的對象。正如你從異常:_CSV數據源不支持struct ,values:array > data type_。 – zero323

+0

是的,我想,但爲什麼它與Spark 2.0一起工作? –

+0

它不適用於2.0。它曾經用1.x中的'spark-csv'工作,其中向量已被轉換爲字符串。 – zero323

回答

2

以下答案由@ zero323的評論組成。

CSV源不支持複雜的對象。與您的例外情況完全相同:CSV數據源不支持struct,values:array>數據類型。是一個預期的行爲。它不適用於Spark 2.x,儘管它在1.x中使用spark-csv,其中矢量已轉換爲字符串。

此行爲在以下jira SPARK-16216中是正確的。

+1

只需添加它即可與Spark 2.0.0一起使用。它從2.0.1開始停止工作。 –

0

作爲一種解決方法,您可以使用此類fork中的VectorDisassembler類,或者採用描述here的解決方案。

我已經使用VectorDisassembler將ml.feature.StandardScaler.fit方法的結果數據框存儲到CSV中。

代碼看起來大致是這樣的:

val disassembler = new org.apache.spark.ml.feature.VectorDisassembler() 
val disassembledDF = disassembler.setInputCol("scaledFeatures").transform(df) 
disassembledDF.show()