4

我想根據製造商列內容將數據集拆分成不同的數據集。這是非常緩慢的
請建議一種方法來改進代碼,以便它可以更快地執行並減少Java代碼的使用。
根據火花中的列值拆分數據集

List<Row> lsts= countsByAge.collectAsList(); 

     for(Row lst:lsts){ 
      String man=lst.toString(); 
      man = man.replaceAll("[\\p{Ps}\\p{Pe}]", ""); 
      Dataset<Row> DF = src.filter("Manufacturer='"+man+"'"); 
      DF.show(); 

     } 

守則,輸入和輸出數據集如下所示。

package org.sparkexample; 
    import org.apache.parquet.filter2.predicate.Operators.Column; 
    import org.apache.spark.SparkConf; 
    import org.apache.spark.api.java.JavaSparkContext; 
    import org.apache.spark.sql.Dataset; 
    import org.apache.spark.sql.RelationalGroupedDataset; 
    import org.apache.spark.sql.Row; 
    import org.apache.spark.sql.SQLContext; 
    import org.apache.spark.sql.SparkSession; 

    import java.util.Arrays; 
    import java.util.List; 

    import org.apache.spark.api.java.JavaPairRDD; 
    import org.apache.spark.api.java.JavaRDD; 
      public class GroupBy { 

       public static void main(String[] args) { 
        System.setProperty("hadoop.home.dir", "C:\\winutils"); 
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]")); 
        SQLContext sqlContext = new SQLContext(sc); 
        SparkSession spark = SparkSession.builder().appName("split datasets").getOrCreate(); 
        sc.setLogLevel("ERROR"); 

        Dataset<Row> src= sqlContext.read() 
           .format("com.databricks.spark.csv") 
           .option("header", "true") 
           .load("sample.csv"); 


        Dataset<Row> unq_manf=src.select("Manufacturer").distinct(); 
        List<Row> lsts= unq_manf.collectAsList(); 

        for(Row lst:lsts){ 
         String man=lst.toString(); 
         man = man.replaceAll("[\\p{Ps}\\p{Pe}]", ""); 
         Dataset<Row> DF = src.filter("Manufacturer='"+man+"'"); 
         DF.show(); 

       } 
      } 
     } 

     INPUT TABLE- 
     +------+------------+--------------------+---+ 
     |ItemID|Manufacturer|  Category name|UPC| 
     +------+------------+--------------------+---+ 
     | 804|   ael|Brush & Broom Han...|123| 
     | 805|   ael|Wheel Brush Parts...|124| 
     | 813|   ael|  Drivers Gloves|125| 
     | 632|  west|  Pipe Wrenches|126| 
     | 804|   bil|  Masonry Brushes|127| 
     | 497|  west| Power Tools Other|128| 
     | 496|  west| Power Tools Other|129| 
     | 495|   bil|   Hole Saws|130| 
     | 499|   bil| Battery Chargers|131| 
     | 497|  west| Power Tools Other|132| 
     +------+------------+--------------------+---+ 

     OUTPUT- 
     +------------+ 
     |Manufacturer| 
     +------------+ 
     |   ael| 
     |  west| 
     |   bil| 
     +------------+ 

     +------+------------+--------------------+---+ 
     |ItemID|Manufacturer|  Category name|UPC| 
     +------+------------+--------------------+---+ 
     | 804|   ael|Brush & Broom Han...|123| 
     | 805|   ael|Wheel Brush Parts...|124| 
     | 813|   ael|  Drivers Gloves|125| 
     +------+------------+--------------------+---+ 

     +------+------------+-----------------+---+ 
     |ItemID|Manufacturer| Category name|UPC| 
     +------+------------+-----------------+---+ 
     | 632|  west| Pipe Wrenches|126| 
     | 497|  west|Power Tools Other|128| 
     | 496|  west|Power Tools Other|129| 
     | 497|  west|Power Tools Other|132| 
     +------+------------+-----------------+---+ 

     +------+------------+----------------+---+ 
     |ItemID|Manufacturer| Category name|UPC| 
     +------+------------+----------------+---+ 
     | 804|   bil| Masonry Brushes|127| 
     | 495|   bil|  Hole Saws|130| 
     | 499|   bil|Battery Chargers|131| 
     +------+------------+----------------+---+ 

謝謝

+0

我也有同樣的問題,因此upvoting。 –

回答

1

你有兩個選擇,在這種情況下:

  1. 首先你必須收集獨特的出廠值,然後通過產生的陣圖 :

    val df = Seq(("HP", 1), ("Brother", 2), ("Canon", 3), ("HP", 5)).toDF("k", "v")  
    val brands = df.select("k").distinct.collect.flatMap(_.toSeq) 
    val BrandArray = brands.map(brand => df.where($"k" <=> brand)) 
    BrandArray.foreach { x => 
    x.show() 
    println("---------------------------------------") 
    } 
    
  2. 您還可以保存基於製造商的數據幀。

    df.write.partitionBy("hour").saveAsTable("parquet")

+1

你能用Java發送代碼嗎? – Nischay