2016-09-12 83 views
0

如何在RDD/DF中創建List/Map以便獲取聚合?SPARK:如何在Scala中創建RDD [Row]的集合

我有一個文件,其中每行是一個JSON對象:

{ 
itemId :1122334, 

language: [ 
     { 
      name: [ 
       "US", "FR" 
      ], 
      value: [ 
       "english", "french" 
      ] 
     }, 
     { 
      name: [ 
       "IND" 
      ], 
      value: [ 
       "hindi" 
      ] 
     } 
    ], 

country: [ 
    { 
     US: [ 
      { 
       startTime: 2016-06-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     CANADA: [ 
      { 
       startTime: 2016-06-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     DENMARK: [ 
      { 
       startTime: 2016-06-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     FRANCE: [ 
      { 
       startTime: 2016-08-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ] 
    } 
] 
}, 

{ 
itemId :1122334, 

language: [ 
     { 
      name: [ 
       "US", "FR" 
      ], 
      value: [ 
       "english", "french" 
      ] 
     }, 
     { 
      name: [ 
       "IND" 
      ], 
      value: [ 
       "hindi" 
      ] 
     } 
    ], 

country: [ 
    { 
     US: [ 
      { 
       startTime: 2016-06-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     CANADA: [ 
      { 
       startTime: 2016-07-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     DENMARK: [ 
      { 
       startTime: 2016-06-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ], 
     FRANCE: [ 
      { 
       startTime: 2016-08-06T17: 39: 35.000Z, 
       endTime: 2016-07-28T07: 00: 00.000Z 
      } 
     ] 
    } 
] 
} 

我有匹配的POJO這讓我從JSON值。

import com.mapping.data.model.MappingUtils 
import com.mapping.data.model.CountryInfo 


val mappingPath = "s3://.../" 

val timeStamp = "2016-06-06T17: 39: 35.000Z" 
val endTimeStamp = "2016-06-07T17: 39: 35.000Z" 


val COUNTRY_US = "US" 
val COUNTRY_CANADA = "CANADA" 
val COUNTRY_DENMARK = "DENMARK" 
val COUNTRY_FRANCE = "FRANCE" 


val input = sc.textFile(mappingPath) 

輸入是jsons的列表,其中每行是JSON這我映射到POJO類CountryInfo使用MappingUtils它負責JSON解析和轉換:

val MappingsList = input.map(x=> { 
        val countryInfo = MappingUtils.getCountryInfoString(x); 
        (countryInfo.getItemId(), countryInfo) 
       }).collectAsMap 

MappingsList: scala.collection.Map[String,com.mapping.data.model.CountryInfo] 


def showCountryInfo(x: Option[CountryInfo]) = x match { 
     case Some(s) => s 
    } 

但我需要創建DF/RDD,這樣我就可以根據itemId獲得國家和語言的彙總。

在給出的例子中,如果國家的起始時間不小於「2016-06-07T17:39:35.000Z」,那麼該值將爲零。

哪種格式會好創建最終的總JSON:

1. List ? 

    |-----itemId-------|----country-------------------|-----language---------------------| 
    |  1122334  | [US, CANADA,DENMARK]  |  [english,hindi,french]  | 
    |  1122334  | [US,DENMARK]    |  [english]     | 
    |------------------|------------------------------|----------------------------------| 

2. Map ?  



|-----itemId-------|----country---------------------------------|-----language---------------------| 
    |  1122334  | (US,2) (CANADA,1) (DENMARK,2) (FRANCE, 0) |(english,2) (hindi,1) (french,1) | 
       |....                        | 
       |....                        |  
       |....                        | 
       |------------------|--------------------------------------------|----------------------------------| 

我想創造出具有類似的總價值最終JSON:

{ 
    itemId: "1122334", 
    country: { 
     "US" : 2, 
     "CANADA" : 1, 
     "DENMARK" : 2, 
     "FRANCE" : 0 

    }, 
    language: { 
     "english" : 2, 
     "french" : 1, 
     "hindi" : 1 
    } 
    } 

我試圖名單:

val events = sqlContext.sql("select itemId EventList") 

    val itemList = events.map(row => { 
     val itemId = row.getAs[String](1); 
     val countryInfo = showTitleInfo(MappingsList.get(itemId)); 

     val country = new ListBuffer[String]() 
     country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) COUNTRY_US; 
     country += if (countryInfo.getCountry().getCANADA().get(0).getStartTime() < endTimeStamp) COUNTRY_CANADA; 
     country += if (countryInfo.getCountry().getDENMARK().get(0).getStartTime() < endTimeStamp) COUNTRY_DENMARK; 
     country += if (countryInfo.getCountry().getFRANCE().get(0).getStartTime() < endTimeStamp) COUNTRY_FRANCE; 

     val languageList = new ListBuffer[String]() 
     val language = countryInfo.getLanguages().collect.foreach(x => languageList += x.getValue()); 

     Row(itemId, country.toList, languageList.toList) 
      }) 

and Map:

val itemList = events.map(row => { 
    val itemId = row.getAs[String](1); 
    val countryInfo = showTitleInfo(MappingsList.get(itemId)); 

    val country: Map[String, Int] = Map() 
    country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_US' -> 1) else ('COUNTRY_US' -> 0) 
    country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_CANADA' -> 1) else ('COUNTRY_CANADA' -> 0) 
    country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_DENMARK' -> 1) else ('COUNTRY_DENMARK' -> 0) 
    country += if (countryInfo.getCountry().getUS().get(0).getStartTime() < endTimeStamp) ('COUNTRY_FRANCE' -> 1) else ('COUNTRY_FRANCE' -> 0) 


    val language: Map[String, Int] = Map() 
    countryInfo.getLanguages().collect.foreach(x => language += (x.getValue -> 1)) ; 

    Row(itemId, country, language) 
     }) 

但兩者都在齊柏林飛船上凍結。有沒有更好的方式來獲得作爲JSON聚合?哪個更好List/Map構造最終的聚合?

回答

0

如果用Spark DataFrame/Dataset和Row重新表述問題將會很有幫助;我知道你最終想要使用JSON,但JSON輸入/輸出的細節是一個單獨的問題。

您正在查找的功能是Spark SQL aggregate function(請參閱該頁面上的組)。功能collect_listcollect_set是相關的,但是你需要的功能還沒有實現。

您可以實施我所謂的count_by_value通過派生自org.spark.spark.sql.expressions.UserDefinedAggregateFunction。這需要深入瞭解Spark SQL的工作原理。

一旦count_by_value實現,你可以使用它像這樣:

df.groupBy("itemId").agg(count_by_value(df("country")), count_by_value(df("language"))) 
相關問題