2016-10-13 23 views
0

我對Spark上的Scala很陌生,想知道如何創建鍵值對,並且鍵有多個元素。例如,我有這樣的數據集的嬰兒的名字:按鍵排序,但值有一個以上的元素使用斯卡拉

年,名稱,縣,數

2000,約翰國王,50

2000年,BOB,國王,40

2000年, MARY,NASSAU,60

2001,JOHN,KINGS,14

2001,簡,KINGS,30

2001,BOB,NASSAU,45

我想找到每個縣的最頻繁發生,無論年份。我該怎麼做呢?

我用循環完成了這個。請參閱下文。但我想知道是否有更短的方法來實現這個利用Spark和Scala二元性的方法。 (即我可以減少計算時間嗎?)

val names = sc.textFile("names.csv").map(l => l.split(",")) 

val uniqueCounty = names.map(x => x(2)).distinct.collect 

for (i <- 0 to uniqueCounty.length-1) { 
    val county = uniqueCounty(i).toString; 
    val eachCounty = names.filter(x => x(2) == county).map(l => (l(1),l(4))).reduceByKey((a,b) => a + b).sortBy(-_._2); 
    println("County:" + county + eachCounty.first) 
} 
+0

@maasg請參考上面。我按原文發佈。 – ScalaNewb

回答

0

下面是使用RDD的溶液。我假設你需要每縣的頂級名稱。

val data = Array((2000, "JOHN", "KINGS", 50),(2000, "BOB", "KINGS", 40),(2000, "MARY", "NASSAU", 60),(2001, "JOHN", "KINGS", 14),(2001, "JANE", "KINGS", 30),(2001, "BOB", "NASSAU", 45)) 
val rdd = sc.parallelize(data) 
//Reduce the uniq values for county/name as combo key 
val uniqNamePerCountyRdd = rdd.map(x => ((x._3,x._2),x._4)).reduceByKey(_+_) 
// Group names per county. 
val countyNameRdd = uniqNamePerCountyRdd.map(x=>(x._1._1,(x._1._2,x._2))).groupByKey() 
// Sort and take the top name alone per county 
countyNameRdd.mapValues(x => x.toList.sortBy(_._2).take(1)).collect 

輸出:

res8: Array[(String, List[(String, Int)])] = Array((KINGS,List((JANE,30))), (NASSAU,List((BOB,45)))) 
+0

這個伎倆!謝謝。我只需添加一行: – ScalaNewb

+0

babyNames.map(x =>(x(1),x(2),toInt(x(4)))) – ScalaNewb

+0

將RDD放入正確的數組格式。 – ScalaNewb

0

您可以使用spark-csv和Dataframe API。如果您使用Spark(2.0)的新版本,則略有不同。 Spark 2.0有一個基於spark-csv的本地csv數據源。

使用spark-csv將您的csv文件加載到Dataframe中。

val df = sqlContext.read.format("com.databricks.spark.csv") 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .load(new File(getClass.getResource("/names.csv").getFile).getAbsolutePath) 
df.show 

給出輸出:

+----+----+------+------+ 
|Year|Name|County|Number| 
+----+----+------+------+ 
|2000|JOHN| KINGS| 50| 
|2000| BOB| KINGS| 40| 
|2000|MARY|NASSAU| 60| 
|2001|JOHN| KINGS| 14| 
|2001|JANE| KINGS| 30| 
|2001| BOB|NASSAU| 45| 
+----+----+------+------+ 

DataFrames使用一組結構化數據操縱操作的。你可以使用一些基本的操作來獲得你的結果。

import org.apache.spark.sql.functions._ 
df.select("County","Number").groupBy("County").agg(max("Number")).show 

給出輸出:

+------+-----------+ 
|County|max(Number)| 
+------+-----------+ 
|NASSAU|   60| 
| KINGS|   50| 
+------+-----------+ 

這是你要實現的目標是什麼?

請注意agg()函數所需的import org.apache.spark.sql.functions._

更多information約Dataframes API

EDIT

對於正確的輸出:

df.registerTempTable("names") 

//there is probably a better query for this 
sqlContext.sql("SELECT * FROM (SELECT Name, County,count(1) as Occurrence FROM names GROUP BY Name, County ORDER BY " + 
    "count(1) DESC) n").groupBy("County", "Name").max("Occurrence").limit(2).show 

給出輸出:

+------+----+---------------+ 
|County|Name|max(Occurrence)| 
+------+----+---------------+ 
| KINGS|JOHN|    2| 
|NASSAU|MARY|    1| 
+------+----+---------------+