2017-07-02 73 views
0

我有以下數據:取消組合分組的表Scala中

53,Male,11th,<=50K 
53,Male,11th,<=50K 
53,Male,11th,<=50K 
20,Female,Masters,>50K 
20,Female,Masters,>50K 
33,Male,Bachelors,<=50K 

接下來,我通過使用選擇和組需要組以上的數據。所以它會是這樣的:

53,Male,11th,<=50K,3 
20,Female,Masters,>50K,2 
33,Male,Bachelors,<=50K,1 

其中最後一個數字顯示類似記錄的數量。現在我需要過濾等效記錄數> 2,並將其存儲在單獨的文件中

我已經通過Scala命令中的SQL查詢對數據進行了分組。爲了取消分組數據,我可以創建一個表格並通過(插入命令)並逐行添加分組數據。它可以工作,但速度非常慢,只需要幾個小時就可以拍攝幾張唱片。有沒有什麼想法使用斯卡拉非常感謝。 命令如下所示:

import spark.sqlContext.implicits._ 
import scala.collection.immutable.Map 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} 


case class Rating(age: Double,edu: String, sex: String, salary: String) 

val Result = sc.textFile("hdfs://NameNode01:9000/input/adult.csv").map(_.split(",")).map(p => Rating(p(0).trim.toDouble,p(1),p(2),p(3))).toDF() 
Result.registerTempTable("Start") 

val sal1=spark.sqlContext.sql("SELECT age,edu,sex,salary,count(*) as cnt from Start group by age,edu,sex,salary") 
sal1.registerTempTable("adult") 

val sal2=spark.sqlContext.sql("SELECT age,edu,sex,salary,cnt from adult WHERE cnt>3") 
sal2.registerTempTable("adult2") 

var ag=sal2.map(age => ""+age(0)).collect() 
var ed=sal2.map(edu => ""+edu(1)).collect() 
var se=sal2.map(sex => ""+sex(2)).collect() 
var sa=sal2.map(salary => ""+salary(3)).collect() 
var cn=sal2.map(cnt => ""+cnt(4)).collect() 

//convert age to double 
val ages= ag.map(_.toDouble) 

//convert the cnt to integer 
val counts= cn.map(_.toInt) 

//length of the array 
var cnt_length=counts.size 


//create a table and add the sal2 records in it 
val adlt2=spark.sqlContext.sql("CREATE TABLE adult3 (age double, edu string, sex string, salary string)") 


//loop and enter the number of cn 
var sql_querys="query" 
var i=0 
var j=0 
var loop_cnt=0 

for(i <-0 to cnt_length-1){ 
    loop_cnt=counts(i) 
    for(j <-0 to loop_cnt-1){ 
     sql_querys="INSERT into adult3 values ("+ages(i)+",'"+ed(i)+"','"+se(i)+"','"+sa(i)+"')" 

     val adlt3=spark.sqlContext.sql("INSERT into adult3 values ("+ages(i)+",'"+ed(i)+"','"+se(i)+"','"+sa(i)+"')") 
    } 

} 

的主要部分是在代碼的結束時的循環。

+0

我不知道你究竟問..你能否給出'sal1','sal2'的數據示例和所需的輸出? – shakedzy

+0

你可以編輯你的帖子。 – mtoto

回答

1

你可能要考慮按照groupBy計數取消組合使用explode您的數據框:

import org.apache.spark.sql.functions._ 

case class Rating(age: Double, edu: String, sex: String, salary: String) 

val Result = sc.textFile("/Users/leo/projects/spark/files/testfile.csv"). 
    map(_.split(",")). 
    map(p => Rating(p(0).trim.toDouble, p(1).trim, p(2).trim, p(3).trim)). 
    toDF 

val saDF1 = Result.groupBy("age", "edu", "sex", "salary").agg(count("*") as "cnt") 

val saDF2 = Result.groupBy("age", "edu", "sex", "salary").agg(count("*") as "cnt").where($"cnt" > 2) 

// Create a UDF to fill array of 1's to be later exploded 
val fillArr = (n: Int) => Array.fill(n)(1) 
val fillArrUDF = udf(fillArr) 

val expandedDF1 = saDF1.withColumn("arr", fillArrUDF($"cnt")) 

expandedDF1.show 
+----+------+---------+------+---+---------+ 
| age| edu|  sex|salary|cnt|  arr| 
+----+------+---------+------+---+---------+ 
|33.0| Male|Bachelors| <=50K| 1|  [1]| 
|20.0|Female| Masters| >50K| 2| [1, 1]| 
|53.0| Male|  11th| <=50K| 3|[1, 1, 1]| 
+----+------+---------+------+---+---------+ 

// Ungroup dataframe using explode 
val ungroupedDF1 = expandedDF1.withColumn("a", explode($"arr")). 
    select("age", "edu", "sex", "salary") 

ungroupedDF1.show 
+----+------+---------+------+ 
| age| edu|  sex|salary| 
+----+------+---------+------+ 
|33.0| Male|Bachelors| <=50K| 
|20.0|Female| Masters| >50K| 
|20.0|Female| Masters| >50K| 
|53.0| Male|  11th| <=50K| 
|53.0| Male|  11th| <=50K| 
|53.0| Male|  11th| <=50K| 
+----+------+---------+------+ 
2

這裏是一個較短的解決方案,它僅使用RDDS:

val result = sc 
    .textFile("hdfs://NameNode01:9000/input/adult.csv") 
    .map({ (line: String) => 
    val p = line.split(",") 
    (Rating(p(0).trim.toDouble,p(1),p(2),p(3)), 1) 
    }) 
    .reduceByKey(_ + _) 
    .filter(_._2 > 2) 
    .flatMap(rating => Array.fill(rating._2)(rating._1)) 

其工作原理如下:

  • textfile將rdd從文件
  • map變換線對的形式的(rating, 1)
  • reduceByKey組對由rating和求和1S(即統計每個等級的出現)
  • filter丟棄其出現次數少於3次
  • flatmap重複每個等級這麼多次作爲計數的收視率,然後變平,所有的結果到一個RDD

這裏是初始方法無法執行的一些原因:

  1. collectcollect在數據幀上用於讀取本地計算機上的內容。這意味着你直接放棄了Spark的所有並行和集羣優勢。
  2. for循環執行單個數據幀的插入。火花對象的可用轉換(例如,map,filter,reduce,單個sql查詢)經過高度優化,以分佈式方式執行這些操作。通過使用for循環執行單行操作,您將失去此優勢,並且您可能會因循環中每次迭代期間複製的數據幀而承受極大的開銷。
  3. (次要)將RDD轉換爲數據幀會增加一些額外的計算成本。因此,除非您打算執行幾項可從數據框或數據集的性能特徵中受益的操作,否則我會建議僅使用rdds來簡化操作。
1

根據我的理解你的問題,你想過濾掉大於2的相似記錄並寫入文件。如果那麼如此如此的可以成爲你的解決方案。

您必須已經有原來的數據幀作爲

+----+------+---------+------+ 
|age |edu |sex  |salary| 
+----+------+---------+------+ 
|53.0|Male |11th  |<=50K | 
|53.0|Male |11th  |<=50K | 
|53.0|Male |11th  |<=50K | 
|20.0|Female|Masters |>50K | 
|20.0|Female|Masters |>50K | 
|33.0|Male |Bachelors|<=50K | 
+----+------+---------+------+ 

你不需要編寫複雜的SQL查詢找到數,你可以使用內置的功能

val columnNames = Result.columns 
val finalTemp = Result.groupBy(columnNames.map(col): _*).agg(count("salary").as("similar records")) 

這應該給輸出爲

+----+------+---------+------+---------------+ 
|age |edu |sex  |salary|similar records| 
+----+------+---------+------+---------------+ 
|33.0|Male |Bachelors|<=50K |1    | 
|20.0|Female|Masters |>50K |2    | 
|53.0|Male |11th  |<=50K |3    | 
+----+------+---------+------+---------------+ 

現在要過濾,您可以使用過濾功能作爲

val finalTable = finalTemp.filter($"similar records" < 3) 

最終輸出是

+----+------+---------+------+---------------+ 
|age |edu |sex  |salary|similar records| 
+----+------+---------+------+---------------+ 
|33.0|Male |Bachelors|<=50K |1    | 
|20.0|Female|Masters |>50K |2    | 
+----+------+---------+------+---------------+ 

可以將其保存到一個文件

finalTable.write.format("com.databricks.spark.csv").save("output path") 

如果你想過濾掉,那麼你可以簡單地使用原始數據合併爲

Result.join(finalTable, Seq(columnNames: _*)).show(false) 

輸出是

+----+------+---------+------+---------------+ 
|age |edu |sex  |salary|similar records| 
+----+------+---------+------+---------------+ 
|33.0|Male |Bachelors|<=50K |1    | 
|20.0|Female|Masters |>50K |2    | 
|20.0|Female|Masters |>50K |2    | 
+----+------+---------+------+---------------+ 

你可以將它保存到一個文件作爲上述

注意:您將需要以下導入了上述功能的工作

import org.apache.spark.sql.functions._