2017-11-11 118 views
0

查找我做的火花項目和需要就如何解決的最佳方式如下問題:火花從一個小文件

我有一個數據幀(說MainDF),其中有上百萬的記錄。格式是這樣的(name:String,value:Int)。下面的內容例如:

Davi,130 
Joel,20 
Emma,500 

我還有一個小文件,與4號線的記錄,像這樣(的className:字符串,minValue(最小值):詮釋,包括maxValue:智力) 現在我需要創建通過查找類文件名基於最大和最小,爲低於上述記錄輸出之間的值:

First,500,9999999 
Second,100,499 
Third,0,99 
Unknown,-99999,0 

我需要根據從值範圍查找在MainDF每個值​​這個小文件,並添加類名small File.Example:

Davi,130,Second 
Joel,20,Third 
Emma,500,First 

這是我寫的代碼:

//Main Data read, millions of records 
val MainData = sc.textFile("/mainfile.csv") 
case class MainType(Name:String,value:Int) 
val MainDF = MainData .map(line => line.split(",")).map(e =>MainType(e(0),e(1).toInt))).toDF 
MainDF.registerTempTable("MainTable") 
val refData = sc.broadast(sc.textFile("/refdata.csv")) 
case class refDataType (className:String,minValue:Int,maxValue:Int) 
//ref data, just 4 records 
val refRDD = refData.map(line=> line.split(",")).map(e => refDataType (e(0) , e(1).toInt, e(2).toInt)) 

我想我要在這裏寫一個UDF,但我不知道如何在UDF使用一個數據幀,或有任何方式做到這一點spark scala

+0

如果這4個條件總是相同的話,你可以編寫一個if-else if if-else UDF,而不是從該文件創建一個數據框 – philantrovert

+0

@philantrovert感謝您的關注,用戶可以更新它,因此它可能會有所不同。 – user3124284

+0

除了在這裏使用UDF之外,還有其他的方法嗎? – user3124284

回答

1

您可以使用textFile將文件作爲RDD讀取,因爲它非常小(可能根據您的要求廣播)。

通過收集RDD獲得陣列後,您可以創建一個Range,然後創建一個UDF來檢查您的值是否在該範圍內。

val rdd = sc.parallelize(Array(
("First",500,9999999), 
("Second",100,499), 
("Third",0,99), 
("Unknown",-99999,0) 
)) 

val dataArr = rdd.map{ case (className, min, max) => 
         (className, Range(min, max)) }.collect 
// First Element will be the Class Name 
// Second will be the Range(min, max) 
// sc.broadcast(dataArr) here 

val getClassName = udf {(x: Int) => { 
        dataArr.map{ e => 
         if (e._2.contains(x)) e._1.toString 
         else null.asInstanceOf[String] } 
        .filter(_ != null) 
        .apply(0) }} 

df.withColumn("ClassName", getClassName($"VALUE")).show 
+----+-----+---------+ 
|NAME|VALUE|ClassName| 
+----+-----+---------+ 
|Davi| 130| Second| 
|Joel| 20| Third| 
|Emma| 500| First| 
+----+-----+---------+ 

我很積極,可能會有更好的解決方案。

1

這裏最簡單的方法是閱讀既使用csv數據源,並使用標準SparkSQL加入他們,像這樣的文件:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType} 
val mainSchema = StructType(Seq(StructField("name", StringType, false), 
StructField("value", IntegerType, false))) 
val mainDf = spark.read.schema(mainSchema).csv("/tmp/b.txt") 
val lookupSchema = StructType(Seq(StructField("class_name", StringType, false), StructField("min_value", IntegerType, false), 
StructField("max_value", IntegerType, false))) 
val lookupDf = spark.read.schema(lookupSchema).csv("/tmp/a.txt") 
val result = mainDf.join(lookupDf, $"value" <= $"max_value" && $"value" > $"min_value") 
result.show() 

我不知道最高效的方式是否是此一個或一個由@philantrovert建議(這可能也取決於你使用的Spark版本)。你應該嘗試他們兩個,並決定自己。