2016-09-02 65 views
1

我想分裂一個大木文件到HDFS不同的文件夾中的多個文件,實木複合地板,這樣我就可以建立分區表(無論蜂巢/鑽頭/星火SQL)在上面。分裂一個大木文件分成多個文件,實木複合地板用鑰匙

數據例如:

+-----+------+ 
|model| num1| 
+-----+------+ 
| V80| 195.0| 
| V80| 750.0| 
| V80| 101.0| 
| V80| 0.0| 
| V80| 0.0| 
| V80| 720.0| 
| V80|1360.0| 
| V80| 162.0| 
| V80| 150.0| 
| V90| 450.0| 
| V90| 189.0| 
| V90| 400.0| 
| V90| 120.0| 
| V90| 20.3| 
| V90| 0.0| 
| V90| 84.0| 
| V90| 555.0| 
| V90| 0.0| 
| V90| 9.0| 
| V90| 75.6| 
+-----+------+ 

結果文件夾結構應以 「模型」 字段進行分組:

+ 
| 
+-----model=V80 
|  | 
|  +----- XXX.parquet 
+-----model=V90 
|  | 
|  +----- XXX.parquet 

我想這樣的腳本:

def main(args: Array[String]): Unit = { 
    val conf = new SparkConf() 
    case class Infos(name:String, name1:String) 
    val sc = new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    val rdd = sqlContext.read.load("hdfs://nameservice1/user/hive/warehouse/a_e550_parquet").select("model", "num1").limit(10000) 

    val tmpRDD = rdd.map { item => (item(0), Infos(item.getString(0), item.getString(1))) }.groupByKey() 

    for (item <- tmpRDD) { 
     import sqlContext.implicits._ 
     val df = item._2.toSeq.toDF() 
     df.write.mode(SaveMode.Overwrite).parquet("hdfs://nameservice1/tmp/model=" + item._1) 
    } 
    } 

只是把出一個零點例外。

回答

1

你應該從數據框中使用partitionBy。你不需要groupBy。像下面的東西應該給你想要的。

val df = sqlContext.read.parquet("hdfs://nameservice1/user/hive/warehouse/a_e550_parquet").select("model", "num1").limit(10000) 
df.write.partitionBy("model").mode(SaveMode.Overwrite) 
+0

你是正確的,我們嘗試這個。但看起來非常緩慢,大約有5億條記錄。有沒有什麼有效的方法? –

+0

可能有很多原因,這是緩慢的。有些你必須檢查的,這個事情是工作洗牌太多了,(看是否可以增加/減少洗牌分區大小),如果有數據傾斜等,這是很難在不知道數據的話。你是檢查這個問題的合適人選:-) – Jegan

相關問題