2017-02-04 158 views
0

將新列和新行添加到DataFrame的最佳方式是什麼?
可以同時做到這一點嗎?Spark DataFrame將行添加列

例如,我有一個表像AB:

+------+-------+ 
|  a|  b| 
+------+-------+ 
| true| true|  
| true| false| 
+---+---+------+ 

現在我想添加一個新列「C」,以AB和新行,但前提條件得到滿足。 這個條件應該適用於AB中的每一行,包括c = false和c = true。

foo(row): Boolean是條件和:

foo(Row(true, true, false)) = true 
foo(Row(true, true, true)) = true 
foo(Row(true, false, false)) = true 
foo(Row(true, false, false)) = false 

所以新表ABC應該是這樣的:

+------+-------+-------+ 
    |  a|  b|  c| 
    +------+-------+-------+ 
    | true| true| true|  
    | true| true| false|  
    | true| false| false| 
    +------+-------+-------+ 

我試圖CROSSJOIN和過濾:

val rows = List(Row(true), Row(false)) 

val C = spark.createDataFrame(
    spark.sparkContext.parallelize(rows), 
    StructType(List(StructField("c", BooleanType))) 
) 

val ABC = AB.join(C).filter(r => foo(row)) 

性能是很差不好(你能告訴我爲什麼?)。我也試過flat map:

 val encoder = RowEncoder(AB.schema.add(StructField("c", BooleanType))) 

     val ABC = AB.flatMap { row => 
     Seq(Row.fromSeq(row.toSeq :+ true), Row.fromSeq(row.toSeq :+ false)).filter(r => foo(r)) 
     }(encoder) 

性能也不好。鑄造大型表格需要很長的時間。正如我注意到的那樣,鑄件被應用在了肌肉節點上。對於大型表格(百萬行)它表現不佳。

你對這個問題還有其他更好的解決方案嗎?

順便說一句,我使用Apache Spark 2.0.1與Scala。

+0

我真的不明白你在問什麼。而對於記錄交叉連接總是會有不好的表現,除非你正在使用像LSH這樣的哈希技術。 – eliasah

+0

我想展開一個新列和新行的布爾表。我的舊錶可以有2^n行和新表2 ^(n + 1)行(n = |列|)。對於大的n,有很多行。所以我想用函數「foo」來過濾一些行。 –

回答

1

我認爲你比它需要的,從我個人理解,下面應該如果你做一個frame.show它應該表現出你屈服後

val stuff = List[Row](Row(true, true),Row(true, false),Row(false, true), Row(false, false)) 
val rows = sc.parallelize(stuff) 
val schema = StructType(StructField("a", BooleanType, true) :: StructField("b", BooleanType, true) :: Nil) 
val frame = spark.createDataFrame(rows, schema).withColumn("c", col("a")&&(col("b"))) 

然後是結果更復雜

+-----+-----+-----+ 
| a| b| c| 
+-----+-----+-----+ 
| true| true| true| 
| true|false|false| 
|false| true|false| 
|false|false|false| 
+-----+-----+-----+