2016-03-25 28 views
1

我在嘗試將字段從RDD[Array[String]]轉換爲模式中指定的適當值以轉換爲Spark SQL DataFrame時遇到了一個奇怪的問題。如何將Array [String]轉換爲適當的模式?

我有一個RDD[Array[String]]和一個StructType調用schema它指定了幾個字段的類型。什麼是迄今爲止我所做的是:

sqlContext.createDataFrame(
    inputLines.map(rowValues => 
          RowFactory.create(rowValues.zip(schema.toSeq) 
                .map{ case (value, struct) => 
                struct.dataType match { 
                case BinaryType => value.toCharArray().map(ch => ch.toByte) 
                case ByteType => value.toByte 
                case BooleanType => value.toBoolean 
                case DoubleType => value.toDouble 
                case FloatType => value.toFloat 
                case ShortType => value.toShort 
                case DateType => value 
                case IntegerType => value.toInt 
                case LongType => value.toLong 
                case _ => value 
                } 
               })), schema) 

,但我得到這個異常:

調用 toJSON方法時
java.lang.RuntimeException: Failed to convert value [Ljava.lang.Object;@6e9ffad1 (class of class [Ljava.lang.Object;}) with the type of IntegerType to JSON 

...

你想過原因任何想法爲什麼會發生這種情況,我該如何解決這個問題?

至於問,我們這裏有一個例子:

val schema = StructType(Seq(StructField("id",IntegerType),StructField("val",StringType))) 
val inputLines=sc.parallelize(
     Array("1","This is a line for testing"), 
     Array("2","The second line")) 
+0

一個樣本輸入端('schema','inputLines')將是有益的。 –

+0

我得到一個異常與'val inputLines = sc.parallelize(Array(「1」,「This is a line for testing」),Array(「2」,「second line」))' - 我認爲它應該是:'val inputLines = sc.parallelize(Array((「1」,「這是一行測試」),(「2」,「第二行」)))' –

回答

3

你傳入Array作爲唯一的參數RowFactory.create

如果你看到它的方法簽名:

public static Row create(Object ... values) 

,預計一個varargs列表。

所以你只需要將數組轉換爲可變參數列表,使用:_*語法。

sqlContext.createDataFrame(inputLines.map(rowValues => 
    Row(    // RowFactory.create is java api, use Row.apply instead 
     rowValues.zip(schema.toSeq) 
       .map{ case (value, struct) => struct.dataType match { 
        case BinaryType => value.toCharArray().map(ch => ch.toByte) 
        case ByteType => value.toByte 
        case BooleanType => value.toBoolean 
        case DoubleType => value.toDouble 
        case FloatType => value.toFloat 
        case ShortType => value.toShort 
        case DateType => value 
        case IntegerType => value.toInt 
        case LongType => value.toLong 
        case _ => value 
        } 
       } : _*   // <-- make varargs here 
    )), 
    schema) 

在上面的代碼中,我已將RowFactory.create替換爲Row.apply並將參數傳遞爲varargs。

或者,使用Row.fromSeq方法。

重構一個位:

def convertTypes(value: String, struct: StructField): Any = struct.dataType match { 
    case BinaryType => value.toCharArray().map(ch => ch.toByte) 
    case ByteType => value.toByte 
    case BooleanType => value.toBoolean 
    case DoubleType => value.toDouble 
    case FloatType => value.toFloat 
    case ShortType => value.toShort 
    case DateType => value 
    case IntegerType => value.toInt 
    case LongType => value.toLong 
    case _ => value 
} 

val schema = StructType(Seq(StructField("id",IntegerType), 
          StructField("val",StringType))) 

val inputLines = sc.parallelize(Array(Array("1","This is a line for testing"), 
             Array("2","The second line"))) 

val rowRdd = inputLines.map{ array => 
    Row.fromSeq(array.zip(schema.toSeq) 
        .map{ case (value, struct) => 
          convertTypes(value, struct) }) 
} 

val df = sqlContext.createDataFrame(rowRdd, schema) 

df.toJSON.collect 
// Array({"id":1,"val":"This is a line for testing"}, 
//  {"id":2,"val":"The second line"}) 
相關問題