1
我在嘗試將字段從RDD[Array[String]]
轉換爲模式中指定的適當值以轉換爲Spark SQL DataFrame
時遇到了一個奇怪的問題。如何將Array [String]轉換爲適當的模式?
我有一個RDD[Array[String]]
和一個StructType
調用schema
它指定了幾個字段的類型。什麼是迄今爲止我所做的是:
sqlContext.createDataFrame(
inputLines.map(rowValues =>
RowFactory.create(rowValues.zip(schema.toSeq)
.map{ case (value, struct) =>
struct.dataType match {
case BinaryType => value.toCharArray().map(ch => ch.toByte)
case ByteType => value.toByte
case BooleanType => value.toBoolean
case DoubleType => value.toDouble
case FloatType => value.toFloat
case ShortType => value.toShort
case DateType => value
case IntegerType => value.toInt
case LongType => value.toLong
case _ => value
}
})), schema)
,但我得到這個異常:
調用toJSON
方法時
java.lang.RuntimeException: Failed to convert value [Ljava.lang.Object;@6e9ffad1 (class of class [Ljava.lang.Object;}) with the type of IntegerType to JSON
...
你想過原因任何想法爲什麼會發生這種情況,我該如何解決這個問題?
至於問,我們這裏有一個例子:
val schema = StructType(Seq(StructField("id",IntegerType),StructField("val",StringType)))
val inputLines=sc.parallelize(
Array("1","This is a line for testing"),
Array("2","The second line"))
一個樣本輸入端('schema','inputLines')將是有益的。 –
我得到一個異常與'val inputLines = sc.parallelize(Array(「1」,「This is a line for testing」),Array(「2」,「second line」))' - 我認爲它應該是:'val inputLines = sc.parallelize(Array((「1」,「這是一行測試」),(「2」,「第二行」)))' –