2017-05-05 54 views
0

嵌套的元組或在Spark嵌套列和過濾器或組最佳方式由我在通過嵌套列編組問題嵌套列

我的應用程序版本階是是2.11.7,這是我的SBT依賴性

libraryDependencies ++= { 
    val akkaVersion = "2.4.10" 
    val sparkVersion = "2.1.1" 

    Seq(
    "com.typesafe.akka" %% "akka-actor"       % akkaVersion, 
    "com.typesafe"  % "config"        % "1.3.0" , 
    "org.apache.spark" %% "spark-core"       % sparkVersion, 
    "org.apache.spark" %% "spark-sql"       % sparkVersion, 
    "com.typesafe.akka" %% "akka-slf4j"       % akkaVersion, 
    "org.apache.spark" %% "spark-streaming"      % sparkVersion 
) 
} 

這是我的示例數據(1行)

124567893|254887452|52448796|2017-02-22 00:00:02|1|4|0014551233548|N|0|0|2||2|44|4||1|1|||2|-1||1|USD|||1457784114521||7|[1~26.927900~0.390200][4~0.000000~0.000000][8~0.000000~0.000000][9~0.000000~0.000000][11~0.000000~0.000000][12~0.000000~0.000000][13~0.000000~0.000000][71~0.000000~0.000000][91~0.000000~0.000000][111~0.000000~0.000000][131~0.000000~0.000000][251~0.000000~0.000000][311~0.000000~0.000000][331~0.000000~0.000000][451~0.000000~0.000000][3~0.000000~0.000000]|[323~4517.702200~0.390200][384~5310.000000~0.000000][443~4296.000000~0.000000][463~0.000000~0.000000][1024~10.535400~0.390200][1343~57.980000~0.000000][783~0.000000~0.000000][303~0.000000~0.000000][403~10.535400~0.390200][523~13790.000000~0.000000][1143~0.000000~0.000000][763~0.000000~0.000000]| 

這是我的映射器

case class SampleMap(
        id: Long, //1 
        a_id_1: Long, //2 
        b_id_2: Long, //3 
        date_time: String, //4 
        subscriber_type: Int, //5 
        x_type: Int, //6 
        sub_id_2: String, //7 
        account_type: Int, //11 
        master_sub_id: String, //12 
        application_id: Int, //13 
        sup_type_id: Int, //14 
        unit_type_id: Int, //15 
        usage_amount: Long, //16 
        type_of_charge: String, //17 
        identity_id: Int, //18 
        group_id: String, //19 
        charge_code: String, //20 
        content_type: Int, //21 
        fund_usage_type: Int, //24 
        msc_id: String, //28 
        circle_id: Int, //29 
        sp_id: Int, //30 
        balance: List[(Int, Double, Double)], //31 
        z_info: List[(Int, Double, Double] //33 

       ) 

我寫的代碼過濾和映射

private def mappingSparkLoadedSMSData(sparkRdd:Dataset[String]): Dataset[SMSMap] = { 

    import SparkFactory.spark.implicits._ 
    sparkRdd 
     .map(_.split("\\|",-1)) 
     .filter(_.length==33)  //adding last empty string 
     .map(
     data => 
     SMSMap(

      {if(data(0).nonEmpty) data(0).toLong else 0 }, 
      {if(data(1).nonEmpty) data(1).toLong else 0 }, 
      {if(data(2).nonEmpty) data(2).toLong else 0 }, 
      data(3), 
      {if(data(4).nonEmpty) data(4).toInt else 0 }, 
      {if(data(5).nonEmpty) data(5).toInt else 0 }, 
      data(6), 
      {if(data(10).nonEmpty) data(10).toInt else 0 }, 
      data(11), 
      {if(data(12).nonEmpty) data(12).toInt else 0 }, 
      {if(data(13).nonEmpty) data(13).toInt else 0 }, 
      {if(data(14).nonEmpty) data(14).toInt else 0 }, 
      {if(data(15).nonEmpty) data(15).toLong else 0 }, 
      data(16), 
      {if(data(17).nonEmpty) data(17).toInt else 0 }, 
      data(18), 
      data(19), 
      {if(data(20).nonEmpty) data(20).toInt else 0 }, 
      {if(data(23).nonEmpty) data(23).toInt else 0 }, 
      data(27), 
      {if(data(28).nonEmpty) data(28).toInt else 0 }, 
      {if(data(29).nonEmpty) data(29).toInt else 0 }, 

      data(30) 
      .drop(1) 
      .dropRight(1) 
      .split("\\]\\[") 
      .map(_.split('~')) 
      .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0) 
      .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble)) 
      .toList, 

      data(31) 
      .drop(1) 
      .dropRight(1) 
      .split("\\]\\[") 
      .map(_.split('~')) 
      .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0) 
      .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble)) 
      .toList 


     ) 
    ) 
    } 

然後我創建臨時視圖,並嘗試當你在看 y_info查詢這樣

formattedRDD.createOrReplaceTempView("temp_table") //formattedRDD is a val that stored after Mapping 

spark.sql(
     s" select balance from temp_table group by balance" 
    ).collectAsList() 

:列表[(Int,Double,Double)],// 31

第一列是bal_id(Int),第二列是change_balance(Double),第三列是累加(Double),它有mo再比一個集

現在我想按bal_id並獲得change_balance的總和,但我不能這樣做(當然不能這樣做,因爲每一個值)

我有想法將餘額(balance:List [(Int,Double,Double)],// 31)分離到不同的數據集/表中並進行映射和分組,但爲了分開,我們需要爲兩個數據集/表添加auto_increment_id或任何唯一的行標識符(注意id可以重複)

我真的很困惑這一點。任何人都請幫助我。在此先感謝

回答

1

如果您將餘額列分爲三個不同的列,那麼您可以很容易地通過bal_idsumchange_balancegroupBy
您可以在初始階段創建這三個單獨的列。
下面是根據我從你的問題理解的解決方案:

您需要在您的案件類三個列名:

case class SampleMap(
         id: Long, //1 
         a_id_1: Long, //2 
         b_id_2: Long, //3 
         date_time: String, //4 
         subscriber_type: Int, //5 
         x_type: Int, //6 
         sub_id_2: String, //7 
         account_type: Int, //11 
         master_sub_id: String, //12 
         application_id: Int, //13 
         sup_type_id: Int, //14 
         unit_type_id: Int, //15 
         usage_amount: Long, //16 
         type_of_charge: String, //17 
         identity_id: Int, //18 
         group_id: String, //19 
         charge_code: String, //20 
         content_type: Int, //21 
         fund_usage_type: Int, //24 
         msc_id: String, //28 
         circle_id: Int, //29 
         sp_id: Int, //30 
         balance: List[(Int, Double, Double)], //31 
         bal_id: Int,    //added by Ramesh 
         change_balance: Double, //added by Ramesh 
         accumulated: Double,  //added by Ramesh 
         z_info: List[(Int, Double, Double)] //33 
        ) 

你有這三個值來分隔列,而分離創建數據框/數據集。以下是你的代碼的改進版本:

val formattedRDD = sparkRdd.map(_.split("\\|",-1)) 
     .filter(_.length==33)  //adding last empty string 
     .map(data => { 
     val balance = Try(data(30) 
      .drop(1) 
      .dropRight(1) 
      .split("\\]\\[") 
      .map(_.split('~')) 
      .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0) 
      .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble)) 
      .toList) getOrElse List((0, 0.0, 0.0)) 

     SampleMap(
      Try(data(0).toLong) getOrElse 0, 
      Try(data(1).toLong) getOrElse 0, 
      Try(data(2).toLong) getOrElse 0, 
      Try(data(3).toString) getOrElse "", 
      Try(data(4).toInt) getOrElse 0, 
      Try(data(5).toInt) getOrElse 0, 
      Try(data(6).toString) getOrElse "", 
      0, 
      Try(data(11).toString) getOrElse "", 
      Try(data(12).toInt) getOrElse 0, 
      Try(data(13).toInt) getOrElse 0, 
      Try(data(14).toInt) getOrElse 0, 
      Try(data(15).toLong) getOrElse 0, 
      Try(data(16).toString) getOrElse "", 
      Try(data(17).toInt) getOrElse 0, 
      Try(data(18).toString) getOrElse "", 
      Try(data(19).toString) getOrElse "", 
      Try(data(20).toInt) getOrElse 0, 
      Try(data(23).toInt) getOrElse 0, 
      Try(data(27).toString) getOrElse "", 
      Try(data(28).toInt) getOrElse 0, 
      Try(data(29).toInt) getOrElse 0, 
      balance,    //this is the 30th value i.e. balance 
      balance(0)._1,   //this is bal_id 
      balance(0)._2,   //this is change_balance 
      balance(0)._3,   //this is accumulator 

      Try(data(31) 
      .drop(1) 
      .dropRight(1) 
      .split("\\]\\[") 
      .map(_.split('~')) 
      .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0) 
      .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble)) 
      .toList) getOrElse List.empty 
     ) 
     } 
    ) 
    .toDS() 

現在,所有你需要做的是叫聚合

import org.apache.spark.sql.functions.sum 
formattedRDD.groupBy("bal_id").agg(sum("change_balance")).show 

我希望這是你所需要的解決方案

+0

首先對不起的爲已故(0)._ 1,//這是bal_id 餘額(0)._ 2,//這是change_balance 餘額(0)的回覆,關於以下 的 。_3,//這是累加器' 你只選擇第一個元組。但列表中有多個元組 – Muhunthan