1

我已經產生了數據幀如下:SPARK數據幀:如何高效地拆分數據幀爲根據同一列中的每一組值

df.groupBy($"Hour", $"Category") 
    .agg(sum($"value").alias("TotalValue")) 
    .sort($"Hour".asc,$"TotalValue".desc)) 

結果如下所示:

+----+--------+----------+ 
|Hour|Category|TotalValue| 
+----+--------+----------+ 
| 0| cat26|  30.9| 
| 0| cat13|  22.1| 
| 0| cat95|  19.6| 
| 0| cat105|  1.3| 
| 1| cat67|  28.5| 
| 1| cat4|  26.8| 
| 1| cat13|  12.6| 
| 1| cat23|  5.3| 
| 2| cat56|  39.6| 
| 2| cat40|  29.7| 
| 2| cat187|  27.9| 
| 2| cat68|  9.8| 
| 3| cat8|  35.6| 
| ...| ....|  ....| 
+----+--------+----------+ 

我想根據每個唯一值col("Hour")創建新的數據幀,即

  • 爲小時組===
  • 爲組小時== 1
  • 的該組小時== 2 等等...

因此所需的輸出將是:

df0 as: 

+----+--------+----------+ 
|Hour|Category|TotalValue| 
+----+--------+----------+ 
| 0| cat26|  30.9| 
| 0| cat13|  22.1| 
| 0| cat95|  19.6| 
| 0| cat105|  1.3| 
+----+--------+----------+ 

df1 as: 
+----+--------+----------+ 
|Hour|Category|TotalValue| 
+----+--------+----------+ 
| 1| cat67|  28.5| 
| 1| cat4|  26.8| 
| 1| cat13|  12.6| 
| 1| cat23|  5.3| 
+----+--------+----------+ 

和同樣,

df2 as: 

+----+--------+----------+ 
|Hour|Category|TotalValue| 
+----+--------+----------+ 
| 2| cat56|  39.6| 
| 2| cat40|  29.7| 
| 2| cat187|  27.9| 
| 2| cat68|  9.8| 
+----+--------+----------+ 

任何幫助,高度讚賞。

編輯1:

我曾嘗試:

df.foreach(
    row => splitHour(row) 
) 

def splitHour(row: Row) ={ 
    val Hour=row.getAs[Long]("Hour") 

    val HourDF= sparkSession.createDataFrame(List((s"$Hour",1))) 

    val hdf=HourDF.withColumnRenamed("_1","Hour_unique").drop("_2") 

    val mydf: DataFrame =df.join(hdf,df("Hour")===hdf("Hour_unique")) 

    mydf.write.mode("overwrite").parquet(s"/home/dev/shaishave/etc/myparquet/$Hour/") 
    } 

問題,此策略:

它花了8小時,此時它是在一個數據幀df其中有運行超過100萬行,單個節點上的閃存作業大約10 GB RAM。因此,join的效率非常高。

警告:我必須將每個數據幀mydf寫入嵌套模式,需要維護(未展平)。

+0

你可以做'df.write.partitionBy(「hour」)。saveAsTable(「myparquet」)'做到這一點嗎? –

+0

@DennyLee 謝謝,這比我的戰略快了60倍! 但它保存了像'hour = 0','hour = 1'等名稱的結果文件,我想把文件保存爲'0','1'等。 請問您如何實現這一目標的見解? –

+0

您可以將hiveContext與'hive.dynamic.partitioning.custom.pattern'配置一起使用,但保留爲'hour = 0','hour = 1'等的一個優點是,當您'重新運行'spark.read.parquet(...)'它會自動理解底層的動態分區。另一種可能的方法是之後重命名文件夾(即使用'mv'命令),但仍會遇到同樣的問題,即read.parquet不會自動理解動態分區。 –

回答

1

正如我在評論中指出,一個潛在的簡單的方法來此問題將用於:

df.write.partitionBy("hour").saveAsTable("myparquet") 

如上所述,文件夾結構將是myparquet/hour=1myparquet/hour=2,...,myparquet/hour=24而不是myparquet/1,myparquet/2,...,myparquet/24

要更改文件夾結構,你可以

  1. 潛在使用Hive配置明確HiveContext內設置hcat.dynamic.partitioning.custom.pattern;更多的信息在HCatalog DynamicPartitions
  2. 另一種方法是在執行df.write.partitionBy.saveAsTable(...)命令之後直接更改文件系統,例如for f in *; do mv $f ${f/${f:0:5}/} ; done,該命令將從文件夾名稱中刪除Hour=文本。

要注意,通過更改文件夾,當你在該文件夾中運行spark.read.parquet(...)命名模式,星火不會自動理解,因爲其缺少partitionKey(即Hour)信息的動態分區是很重要的。

2
+0

此解決方案,我已經嘗試: [我怎樣才能將數據幀拆分成SCALA和SPARK中具有相同列值的數據框](http://stackoverflow.com/questions/31669308/how-can-i-split-a - 數據幀到數據幀中 - 同列值 - 在 - 斯卡拉 - 和) 'val byStateArray:Array [Any] = cnts.map(c => sorted.where($「cnt」<=> c) )' 2這個問題: i)它返回Array [Any]與期望的數據幀。 ii)顯示錯誤:'無法解決<=>' –

相關問題