嵌套的元組或在Spark嵌套列和過濾器或組最佳方式由我在通過嵌套列編組問題嵌套列
我的應用程序版本階是是2.11.7,這是我的SBT依賴性
libraryDependencies ++= {
val akkaVersion = "2.4.10"
val sparkVersion = "2.1.1"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe" % "config" % "1.3.0" ,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion
)
}
這是我的示例數據(1行)
124567893|254887452|52448796|2017-02-22 00:00:02|1|4|0014551233548|N|0|0|2||2|44|4||1|1|||2|-1||1|USD|||1457784114521||7|[1~26.927900~0.390200][4~0.000000~0.000000][8~0.000000~0.000000][9~0.000000~0.000000][11~0.000000~0.000000][12~0.000000~0.000000][13~0.000000~0.000000][71~0.000000~0.000000][91~0.000000~0.000000][111~0.000000~0.000000][131~0.000000~0.000000][251~0.000000~0.000000][311~0.000000~0.000000][331~0.000000~0.000000][451~0.000000~0.000000][3~0.000000~0.000000]|[323~4517.702200~0.390200][384~5310.000000~0.000000][443~4296.000000~0.000000][463~0.000000~0.000000][1024~10.535400~0.390200][1343~57.980000~0.000000][783~0.000000~0.000000][303~0.000000~0.000000][403~10.535400~0.390200][523~13790.000000~0.000000][1143~0.000000~0.000000][763~0.000000~0.000000]|
這是我的映射器
case class SampleMap(
id: Long, //1
a_id_1: Long, //2
b_id_2: Long, //3
date_time: String, //4
subscriber_type: Int, //5
x_type: Int, //6
sub_id_2: String, //7
account_type: Int, //11
master_sub_id: String, //12
application_id: Int, //13
sup_type_id: Int, //14
unit_type_id: Int, //15
usage_amount: Long, //16
type_of_charge: String, //17
identity_id: Int, //18
group_id: String, //19
charge_code: String, //20
content_type: Int, //21
fund_usage_type: Int, //24
msc_id: String, //28
circle_id: Int, //29
sp_id: Int, //30
balance: List[(Int, Double, Double)], //31
z_info: List[(Int, Double, Double] //33
)
我寫的代碼過濾和映射
private def mappingSparkLoadedSMSData(sparkRdd:Dataset[String]): Dataset[SMSMap] = {
import SparkFactory.spark.implicits._
sparkRdd
.map(_.split("\\|",-1))
.filter(_.length==33) //adding last empty string
.map(
data =>
SMSMap(
{if(data(0).nonEmpty) data(0).toLong else 0 },
{if(data(1).nonEmpty) data(1).toLong else 0 },
{if(data(2).nonEmpty) data(2).toLong else 0 },
data(3),
{if(data(4).nonEmpty) data(4).toInt else 0 },
{if(data(5).nonEmpty) data(5).toInt else 0 },
data(6),
{if(data(10).nonEmpty) data(10).toInt else 0 },
data(11),
{if(data(12).nonEmpty) data(12).toInt else 0 },
{if(data(13).nonEmpty) data(13).toInt else 0 },
{if(data(14).nonEmpty) data(14).toInt else 0 },
{if(data(15).nonEmpty) data(15).toLong else 0 },
data(16),
{if(data(17).nonEmpty) data(17).toInt else 0 },
data(18),
data(19),
{if(data(20).nonEmpty) data(20).toInt else 0 },
{if(data(23).nonEmpty) data(23).toInt else 0 },
data(27),
{if(data(28).nonEmpty) data(28).toInt else 0 },
{if(data(29).nonEmpty) data(29).toInt else 0 },
data(30)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList,
data(31)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList
)
)
}
然後我創建臨時視圖,並嘗試當你在看 y_info查詢這樣
formattedRDD.createOrReplaceTempView("temp_table") //formattedRDD is a val that stored after Mapping
spark.sql(
s" select balance from temp_table group by balance"
).collectAsList()
:列表[(Int,Double,Double)],// 31
第一列是bal_id(Int),第二列是change_balance(Double),第三列是累加(Double),它有mo再比一個集
現在我想按bal_id並獲得change_balance的總和,但我不能這樣做(當然不能這樣做,因爲每一個值)
我有想法將餘額(balance:List [(Int,Double,Double)],// 31)分離到不同的數據集/表中並進行映射和分組,但爲了分開,我們需要爲兩個數據集/表添加auto_increment_id或任何唯一的行標識符(注意id可以重複)
我真的很困惑這一點。任何人都請幫助我。在此先感謝
首先對不起的爲已故(0)._ 1,//這是bal_id 餘額(0)._ 2,//這是change_balance 餘額(0)的回覆,關於以下 的 。_3,//這是累加器' 你只選擇第一個元組。但列表中有多個元組 – Muhunthan