2017-06-06 63 views
3

我正在使用Apache Spark 2.0數據框/數據集API 我想從值列表向我的數據框添加一個新列。我的列表與給定的數據幀具有相同數量的值。Apache Spark如何將新列從列表/數組添加到Spark數據框

val list = List(4,5,10,7,2) 
val df = List("a","b","c","d","e").toDF("row1") 

我想這樣做:

val appendedDF = df.withColumn("row2",somefunc(list)) 
df.show() 
// +----+------+ 
// |row1 |row2 | 
// +----+------+ 
// |a |4 | 
// |b |5 | 
// |c |10 | 
// |d |7 | 
// |e |2 | 
// +----+------+ 

對於任何想法,我將不勝感激,我在現實中數據幀中包含多個列。作爲DataFrame表明,這是一個小的數據幀的事實,(它存在於駕駛員記憶)輸入list具有相同的大小:

+0

如果列表和DF大小不同,會發生什麼?僅包含較大集合中的前N個項目(其中N =較短集合的大小)? –

+0

在我的情況下,我知道它將永遠是相同的長度 –

+0

您也可以將列表轉換爲數據幀。然後將row_number添加到它們並按row_number進行連接。 –

回答

5

你可以做這樣的:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.types._  

// create rdd from the list 
val rdd = sc.parallelize(List(4,5,10,7,2)) 
// rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:28 

// zip the data frame with rdd 
val rdd_new = df.rdd.zip(rdd).map(r => Row.fromSeq(r._1.toSeq ++ Seq(r._2))) 
// rdd_new: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[33] at map at <console>:32 

// create a new data frame from the rdd_new with modified schema 
spark.createDataFrame(rdd_new, df.schema.add("new_col", IntegerType)).show 
+----+-------+ 
|row1|new_col| 
+----+-------+ 
| a|  4| 
| b|  5| 
| c|  10| 
| d|  7| 
| e|  2| 
+----+-------+ 
4

添加的完整性首先 - 所以你可能會考慮collect() -ing它與list荏苒,如果需要轉換回一個DataFrame

df.collect() 
    .map(_.getAs[String]("row1")) 
    .zip(list).toList 
    .toDF("row1", "row2") 

這不會是快,但如果數據非常小,可能可以忽略不計,代碼(可以說)更清晰。

+1

我真的很喜歡這個答案,我認爲對於小數據集來說它是完全可行的 –

相關問題