2017-07-10 125 views
1

我有一個RDD[List[Int]],我不知道list[Int]的計數,我想轉換我Rdd[List[Int]]DataFrame,我應該怎麼做?如何將RDD [List [Int]]轉換爲DataFrame?

這是我輸入:

val l1=Array(1,2,3,4) 
    val l2=Array(1,2,3,4) 
    val Lz=Seq(l1,l2) 
    val rdd1=sc.parallelize(Lz,2) 

這是我期待的結果是:

+---+---+---+---+ 
| _1| _2| _3| _4| 
+---+---+---+---+ 
| 1| 2| 3| 4| 
| 1| 2| 3| 4| 
+---+---+---+---+ 
+0

你不知道l1'和'l2'的'計數,但他們將永遠是相同的長度? – philantrovert

+0

l1和l2的計數是相同的,但列表[Int]的計數並不一定是相同的長度。 – mentongwu

回答

0

您可以採用如下方案

val rdd1=sc.parallelize(Lz,2).map(array => (array(0), array(1), array(2), array(3))).toDF() 
rdd1.show(false) 

它應該給你輸出

+---+---+---+---+ 
|_1 |_2 |_3 |_4 | 
+---+---+---+---+ 
|1 |2 |3 |4 | 
|1 |2 |3 |4 | 
+---+---+---+---+ 
+0

我不知道列表數[Int] – mentongwu

1

@拉梅什的答案是正確的,但你也可以做到以下幾點:

val l1=Array(1,2,3,4) 
val l2=Array(1,2,3,4) 
val Lz=Seq(l1,l2) 
val df = sc.parallelize(Lz,2).map{ 
    case Array(val1, val2, val3, val4) => (val1, val2, val3, val4) 
}.toDF 

df.show 
// +---+---+---+---+ 
// | _1| _2| _3| _4| 
// +---+---+---+---+ 
// | 1| 2| 3| 4| 
// | 1| 2| 3| 4| 
// +---+---+---+---+ 

如果你有大量的列,您需要進行不同,但你需要知道,否則你的數據的模式,你會無法執行以下操作:

val sch = df.schema // I just took the schema from the old df but you can add one programmatically 

val df2 = spark.createDataFrame(sc.parallelize(Lz,2).map{ Row.fromSeq(_) }, sch) 

df2.show 
// +---+---+---+---+ 
// | _1| _2| _3| _4| 
// +---+---+---+---+ 
// | 1| 2| 3| 4| 
// | 1| 2| 3| 4| 
// +---+---+---+---+ 

除非你提供一個模式,你將無法做太多,除了具有陣列列:

val df3 = sc.parallelize(Lz,2).toDF 
// df3: org.apache.spark.sql.DataFrame = [value: array<int>] 
df3.show 
// +------------+ 
// |  value| 
// +------------+ 
// |[1, 2, 3, 4]| 
// |[1, 2, 3, 4]| 
// +------------+ 
df3.printSchema 
//root 
// |-- value: array (nullable = true) 
// | |-- element: integer (containsNull = false) 
+0

我不是列表數[Int],所以我不能得到模式 – mentongwu

+0

l1和l2的計數是相同的,但列表[Int]的計數不是阿爾瓦里的長度相同。 – mentongwu

+0

你不能那樣做。除非你提供一個模式,你只需要一個條目數組 – eliasah

1

可能有一些其他的更好的功能性的方式來做到這一點,但這個工程太:

def getSchema(myArray : Array[Int]): StructType = { 
    var schemaArray = scala.collection.mutable.ArrayBuffer[StructField]() 
    for((el,idx) <- myArray.view.zipWithIndex){ 
     schemaArray += StructField("col"+idx , IntegerType, true) 
    } 
    StructType(schemaArray) 
} 

val l1=Array(1,2,3,4) 
val l2=Array(1,2,3,4) 
val Lz=Seq(l1,l2) 
val rdd1=sc.parallelize(Lz,2).map(Row.fromSeq(_)) 
val schema = getSchema(l1) //Since both arrays will be of same type and size 
val df = sqlContext.createDataFrame(rdd1, schema) 
df.show() 

+----+----+----+----+ 
|col0|col1|col2|col3| 
+----+----+----+----+ 
| 1| 2| 3| 4| 
| 1| 2| 3| 4| 
+----+----+----+----+ 
相關問題