如何在RDD/DF中創建List/Map以便獲取聚合?SPARK:如何在Scala中創建RDD [Row]的集合
我有一個文件,其中每行是一個JSON對象:
{
itemId :1122334,
language: [
{
name: [
"US", "FR"
],
value: [
"english", "french"
]
},
{
name: [
"IND"
],
value: [
"hindi"
]
}
],
country: [
{
US: [
{
startTime: 2016-06-06T17: 39: 35.000Z,
endTime: 2016-07-28T07: 00: 00.000Z
}
],
CANADA: [
{
startTime: 2016-06-06T17: 39: 35.000Z,
endTime: 2016-07-28T07: 00: 00.000Z
}
],
DENMARK: [
{
startTime: 2016-06-06T17: 39: 35.000Z,
endTime: 2016-07-28T07: 00: 00.000Z
}
],
FRANCE: [
{
startTime: 2016-08-06T17: 39: 35.000Z,
endTime: 2016-07-28T07: 00: 00.000Z
}
]
}
]
},
{
itemId :1122334,
language: [
{
name: [
"US", "FR"
],
value: [
"english", "french"
]
},
{
name: [
"IND"
],
value: [
"hindi"
]
}
],
country: [
{
US: [
{
startTime: 2016-06-06T17: 39: 35.000Z,
endTime: 2016-07-28T07: 00: 00.000Z
}
],
CANADA: [
{
startTime: 2016-07-06T17: 39: 35.000Z,
endTime: 2016-07-28T07: 00: 00.000Z
}
],
DENMARK: [
{
startTime: 2016-06-06T17: 39: 35.000Z,
endTime: 2016-07-28T07: 00: 00.000Z
}
],
FRANCE: [
{
startTime: 2016-08-06T17: 39: 35.000Z,
endTime: 2016-07-28T07: 00: 00.000Z
}
]
}
]
}
我有匹配的POJO這讓我從JSON值。
import com.mapping.data.model.MappingUtils
import com.mapping.data.model.CountryInfo
val mappingPath = "s3://.../"
val timeStamp = "2016-06-06T17: 39: 35.000Z"
val endTimeStamp = "2016-06-07T17: 39: 35.000Z"
val COUNTRY_US = "US"
val COUNTRY_CANADA = "CANADA"
val COUNTRY_DENMARK = "DENMARK"
val COUNTRY_FRANCE = "FRANCE"
val input = sc.textFile(mappingPath)
輸入是jsons的列表,其中每行是JSON這我映射到POJO類CountryInfo使用MappingUtils它負責JSON解析和轉換:
val MappingsList = input.map(x=> {
val countryInfo = MappingUtils.getCountryInfoString(x);
(countryInfo.getItemId(), countryInfo)
}).collectAsMap
MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo]
def showCountryInfo(x: Option[CountryInfo]) = x match {
case Some(s) => s
}
但我需要創建DF/RDD,這樣我就可以根據itemId獲得國家和語言的彙總。
在給出的例子中,如果國家的起始時間不小於「2016-06-07T17:39:35.000Z」,那麼該值將爲零。
哪種格式會好創建最終的總JSON:
1. List ?
|-----itemId-------|----country-------------------|-----language---------------------|
| 1122334 | [US, CANADA,DENMARK] | [english,hindi,french] |
| 1122334 | [US,DENMARK] | [english] |
|------------------|------------------------------|----------------------------------|
2. Map ?
|-----itemId-------|----country---------------------------------|-----language---------------------|
| 1122334 | (US,2) (CANADA,1) (DENMARK,2) (FRANCE, 0) |(english,2) (hindi,1) (french,1) |
|.... |
|.... |
|.... |
|------------------|--------------------------------------------|----------------------------------|
我想創造出具有類似的總價值最終JSON:
{
itemId: "1122334",
country: {
"US" : 2,
"CANADA" : 1,
"DENMARK" : 2,
"FRANCE" : 0
},
language: {
"english" : 2,
"french" : 1,
"hindi" : 1
}
}
我試圖名單:
val events = sqlContext.sql("select itemId EventList")
val itemList = events.map(row => {
val itemId = row.getAs[String](1);
val countryInfo = showTitleInfo(MappingsList.get(itemId));
val country = new ListBuffer[String]()
country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) COUNTRY_US;
country += if (countryInfo.getCountry().getCANADA().get(0).getStartTime() < endTimeStamp) COUNTRY_CANADA;
country += if (countryInfo.getCountry().getDENMARK().get(0).getStartTime() < endTimeStamp) COUNTRY_DENMARK;
country += if (countryInfo.getCountry().getFRANCE().get(0).getStartTime() < endTimeStamp) COUNTRY_FRANCE;
val languageList = new ListBuffer[String]()
val language = countryInfo.getLanguages().collect.foreach(x => languageList += x.getValue());
Row(itemId, country.toList, languageList.toList)
})
and Map:
val itemList = events.map(row => {
val itemId = row.getAs[String](1);
val countryInfo = showTitleInfo(MappingsList.get(itemId));
val country: Map[String, Int] = Map()
country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_US' -> 1) else ('COUNTRY_US' -> 0)
country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_CANADA' -> 1) else ('COUNTRY_CANADA' -> 0)
country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_DENMARK' -> 1) else ('COUNTRY_DENMARK' -> 0)
country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_FRANCE' -> 1) else ('COUNTRY_FRANCE' -> 0)
val language: Map[String, Int] = Map()
countryInfo.getLanguages().collect.foreach(x => language += (x.getValue -> 1)) ;
Row(itemId, country, language)
})
但兩者都在齊柏林飛船上凍結。有沒有更好的方式來獲得作爲JSON聚合?哪個更好List/Map構造最終的聚合?